You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/21 15:19:31 UTC

[plc4x] 02/02: Further improvements.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/new-api
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit c1e9d2e70eee03fd39b14ceedf2c6d2a8834ab20
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Wed Aug 21 17:19:23 2019 +0200

    Further improvements.
---
 .../apache/plc4x/java/base/next/PlcConnection.java |  8 ++++-
 .../plc4x/java/base/next/PlcConnectionImpl.java    | 15 ++++++++-
 .../apache/plc4x/java/base/next/PlcProtocol.java   |  5 ++-
 .../plc4x/java/base/next/netty/PlcApiCodec.java    | 36 +++++++++-------------
 4 files changed, 38 insertions(+), 26 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
index 8863ad1..1ea2021 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
@@ -16,5 +16,11 @@ public interface PlcConnection {
      * Is called from downstream to notify the Connection object, that there is a response
      * for one of its requests.
      */
-    void respond(Response response);
+    void handleResponse(Response response);
+
+    /**
+     * This is the handler for an exception which comes from the PIPELINE.
+     * Default behavior is to cancel all pending requests.
+     */
+    void fireException(Throwable cause);
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java
index 38e81af..0e26690 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java
@@ -6,6 +6,8 @@ import org.apache.plc4x.java.base.next.commands.FailedRequest;
 import org.apache.plc4x.java.base.next.commands.ReadRequest;
 import org.apache.plc4x.java.base.next.commands.Response;
 import org.apache.plc4x.java.base.next.netty.PlcApiCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -18,6 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class PlcConnectionImpl implements PlcConnection {
 
+    private static final Logger logger = LoggerFactory.getLogger(PlcConnectionImpl.class);
+
     private final PlcDriver driver;
     private final PlcApiCodec codec;
     private final Channel channel;
@@ -69,7 +73,7 @@ public class PlcConnectionImpl implements PlcConnection {
         throw new NotImplementedException("");
     }
 
-    @Override public void respond(Response response) {
+    @Override public void handleResponse(Response response) {
         // Take the necessary action
         assert requests.containsKey(response.getTransactionId());
         final RequestInformation information = requests.get(response.getTransactionId());
@@ -79,4 +83,13 @@ public class PlcConnectionImpl implements PlcConnection {
             throw new IllegalStateException("This type of response " + response.getClass() + " is not implemented");
         }
     }
+
+    @Override public void fireException(Throwable cause) {
+        logger.warn("Catched an exception, cancel all pending requests", cause);
+        // Find all active requests (future not DONE), and cancel them
+        requests.values().stream()
+            .map(RequestInformation::getFuture)
+            .filter(f -> !f.isDone())
+            .forEach(f -> f.completeExceptionally(cause));
+    }
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java
index a092753..2d11646 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java
@@ -21,7 +21,6 @@ public interface PlcProtocol {
      */
     Response decode(ByteBuf buf) throws UnableToParseException;
 
-    class UnableToParseException extends Exception {
-
-    }
+    /** Used by {@link #decode(ByteBuf)} to notify that no bytes where consumed **/
+    class UnableToParseException extends Exception { }
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java
index 81a4cef..1b090f5 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java
@@ -3,23 +3,25 @@ package org.apache.plc4x.java.base.next.netty;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.ByteToMessageCodec;
 import io.netty.util.ReferenceCountUtil;
 import org.apache.plc4x.java.base.next.PlcConnection;
 import org.apache.plc4x.java.base.next.PlcConnectionImpl;
 import org.apache.plc4x.java.base.next.PlcProtocol;
+import org.apache.plc4x.java.base.next.commands.Message;
 import org.apache.plc4x.java.base.next.commands.ReadRequest;
 import org.apache.plc4x.java.base.next.commands.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 /**
  * This class is the binding element between a {@link org.apache.plc4x.java.base.next.PlcProtocol},
  * netty and the High Level API classes like @{@link org.apache.plc4x.java.base.next.PlcConnection}.
  */
-public class PlcApiCodec extends ChannelDuplexHandler {
+public class PlcApiCodec extends ByteToMessageCodec<Message> {
 
     private static final Logger logger = LoggerFactory.getLogger(PlcApiCodec.class);
 
@@ -50,15 +52,7 @@ public class PlcApiCodec extends ChannelDuplexHandler {
         // ?
     }
 
-    @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
-        // ?
-    }
-
-    @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-        // ?
-    }
-
-    @Override public void write(ChannelHandlerContext ctx, Object command, ChannelPromise promise) throws Exception {
+    @Override protected void encode(ChannelHandlerContext ctx, Message command, ByteBuf out) throws Exception {
         // Check exact message type and forward
         if (command instanceof ReadRequest) {
             logger.debug("Received read request {}", command);
@@ -70,22 +64,22 @@ public class PlcApiCodec extends ChannelDuplexHandler {
         }
     }
 
-    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        final ByteBuf buf = (ByteBuf) msg;
+    @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
         logger.debug("Received message\n"
-            + ByteBufUtil.prettyHexDump(buf));
-        buf.markReaderIndex();
+            + ByteBufUtil.prettyHexDump(in));
+        in.markReaderIndex();
         try {
-            Response response = protocol.decode(buf);
-            this.plcConnection.respond(response);
+            Response response = protocol.decode(in);
+            this.plcConnection.handleResponse(response);
         } catch (PlcProtocol.UnableToParseException e) {
-            buf.resetReaderIndex();
+            in.resetReaderIndex();
         }
-        ReferenceCountUtil.release(msg);
+        ReferenceCountUtil.release(in);
     }
 
     @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        // General Exception handling!
+        // General Exception handling, notify Connection
+        this.plcConnection.fireException(cause);
     }
 
     public void registerConnection(PlcConnectionImpl plcConnection) {