You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/21 15:19:31 UTC
[plc4x] 02/02: Further improvements.
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch feature/new-api
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit c1e9d2e70eee03fd39b14ceedf2c6d2a8834ab20
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Wed Aug 21 17:19:23 2019 +0200
Further improvements.
---
.../apache/plc4x/java/base/next/PlcConnection.java | 8 ++++-
.../plc4x/java/base/next/PlcConnectionImpl.java | 15 ++++++++-
.../apache/plc4x/java/base/next/PlcProtocol.java | 5 ++-
.../plc4x/java/base/next/netty/PlcApiCodec.java | 36 +++++++++-------------
4 files changed, 38 insertions(+), 26 deletions(-)
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
index 8863ad1..1ea2021 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
@@ -16,5 +16,11 @@ public interface PlcConnection {
* Is called from downstream to notify the Connection object, that there is a response
* for one of its requests.
*/
- void respond(Response response);
+ void handleResponse(Response response);
+
+ /**
+ * This is the handler for an exception which comes from the PIPELINE.
+ * Default behavior is to cancel all pending requests.
+ */
+ void fireException(Throwable cause);
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java
index 38e81af..0e26690 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java
@@ -6,6 +6,8 @@ import org.apache.plc4x.java.base.next.commands.FailedRequest;
import org.apache.plc4x.java.base.next.commands.ReadRequest;
import org.apache.plc4x.java.base.next.commands.Response;
import org.apache.plc4x.java.base.next.netty.PlcApiCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -18,6 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class PlcConnectionImpl implements PlcConnection {
+ private static final Logger logger = LoggerFactory.getLogger(PlcConnectionImpl.class);
+
private final PlcDriver driver;
private final PlcApiCodec codec;
private final Channel channel;
@@ -69,7 +73,7 @@ public class PlcConnectionImpl implements PlcConnection {
throw new NotImplementedException("");
}
- @Override public void respond(Response response) {
+ @Override public void handleResponse(Response response) {
// Take the necessary action
assert requests.containsKey(response.getTransactionId());
final RequestInformation information = requests.get(response.getTransactionId());
@@ -79,4 +83,13 @@ public class PlcConnectionImpl implements PlcConnection {
throw new IllegalStateException("This type of response " + response.getClass() + " is not implemented");
}
}
+
+ @Override public void fireException(Throwable cause) {
+ logger.warn("Catched an exception, cancel all pending requests", cause);
+ // Find all active requests (future not DONE), and cancel them
+ requests.values().stream()
+ .map(RequestInformation::getFuture)
+ .filter(f -> !f.isDone())
+ .forEach(f -> f.completeExceptionally(cause));
+ }
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java
index a092753..2d11646 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java
@@ -21,7 +21,6 @@ public interface PlcProtocol {
*/
Response decode(ByteBuf buf) throws UnableToParseException;
- class UnableToParseException extends Exception {
-
- }
+ /** Used by {@link #decode(ByteBuf)} to notify that no bytes where consumed **/
+ class UnableToParseException extends Exception { }
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java
index 81a4cef..1b090f5 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java
@@ -3,23 +3,25 @@ package org.apache.plc4x.java.base.next.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.util.ReferenceCountUtil;
import org.apache.plc4x.java.base.next.PlcConnection;
import org.apache.plc4x.java.base.next.PlcConnectionImpl;
import org.apache.plc4x.java.base.next.PlcProtocol;
+import org.apache.plc4x.java.base.next.commands.Message;
import org.apache.plc4x.java.base.next.commands.ReadRequest;
import org.apache.plc4x.java.base.next.commands.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
/**
* This class is the binding element between a {@link org.apache.plc4x.java.base.next.PlcProtocol},
* netty and the High Level API classes like @{@link org.apache.plc4x.java.base.next.PlcConnection}.
*/
-public class PlcApiCodec extends ChannelDuplexHandler {
+public class PlcApiCodec extends ByteToMessageCodec<Message> {
private static final Logger logger = LoggerFactory.getLogger(PlcApiCodec.class);
@@ -50,15 +52,7 @@ public class PlcApiCodec extends ChannelDuplexHandler {
// ?
}
- @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- // ?
- }
-
- @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- // ?
- }
-
- @Override public void write(ChannelHandlerContext ctx, Object command, ChannelPromise promise) throws Exception {
+ @Override protected void encode(ChannelHandlerContext ctx, Message command, ByteBuf out) throws Exception {
// Check exact message type and forward
if (command instanceof ReadRequest) {
logger.debug("Received read request {}", command);
@@ -70,22 +64,22 @@ public class PlcApiCodec extends ChannelDuplexHandler {
}
}
- @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- final ByteBuf buf = (ByteBuf) msg;
+ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
logger.debug("Received message\n"
- + ByteBufUtil.prettyHexDump(buf));
- buf.markReaderIndex();
+ + ByteBufUtil.prettyHexDump(in));
+ in.markReaderIndex();
try {
- Response response = protocol.decode(buf);
- this.plcConnection.respond(response);
+ Response response = protocol.decode(in);
+ this.plcConnection.handleResponse(response);
} catch (PlcProtocol.UnableToParseException e) {
- buf.resetReaderIndex();
+ in.resetReaderIndex();
}
- ReferenceCountUtil.release(msg);
+ ReferenceCountUtil.release(in);
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- // General Exception handling!
+ // General Exception handling, notify Connection
+ this.plcConnection.fireException(cause);
}
public void registerConnection(PlcConnectionImpl plcConnection) {