You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/12/19 10:51:44 UTC

[plc4x] branch next-gen-core updated: Added Experimental Annotation. - Implemented default methods for read, write, subscribe, ... - Direct communication from connection to protocol

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/next-gen-core by this push:
     new 4681d64  Added Experimental Annotation. - Implemented default methods for read, write, subscribe, ... - Direct communication from connection to protocol
4681d64 is described below

commit 4681d641ec997983d6d1036378ca540f7c9cfcb9
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Thu Dec 19 11:51:36 2019 +0100

    Added Experimental Annotation.
    - Implemented default methods for read, write, subscribe, ...
    - Direct communication from connection to protocol
---
 .../org/apache/plc4x/java/PlcDriverManager.java    |  4 +-
 .../java/api/{Changed.java => Experimental.java}   |  2 +-
 .../plc4x/java/api/PlcConnectionExtension.java     | 83 ++++++++++++++++++
 .../plc4x/java/api/messages/PlcReadResponse.java   |  4 +-
 .../plc4x/java/examples/helloplc4x/HelloPlc4x.java |  4 +-
 .../src/main/resources/logback.xml                 |  2 +-
 .../apache/plc4x/java/spi/Plc4xNettyWrapper.java   | 50 +++++++----
 .../apache/plc4x/java/spi/Plc4xProtocolBase.java   | 45 +++++++---
 .../java/spi/connection/AbstractPlcConnection.java | 98 ++++++++++++++++++++--
 .../java/spi/connection/NettyPlcConnection.java    |  8 ++
 .../spi/internal/DefaultSendRequestContext.java    |  6 +-
 .../spi/GeneratedDriverByteToMessageCodec.java     |  2 +
 .../java/s7/readwrite/connection/S7Connection.java | 20 +----
 .../s7/readwrite/protocol/Plc4xS7Protocol.java     | 84 ++++++++++---------
 14 files changed, 313 insertions(+), 99 deletions(-)

diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java b/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
index 710cdb1..13efee1 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
@@ -18,7 +18,7 @@
  */
 package org.apache.plc4x.java;
 
-import org.apache.plc4x.java.api.Changed;
+import org.apache.plc4x.java.api.Experimental;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -96,7 +96,7 @@ public class PlcDriverManager {
      * @return Driver instance for the given protocol
      * @throws PlcConnectionException If no Suitable Driver can be found
      */
-    @Changed
+    @Experimental
     public PlcDriver getDriver(String url) throws PlcConnectionException {
         try {
             URI connectionUri = new URI(url);
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/Changed.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/Experimental.java
similarity index 96%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/Changed.java
rename to plc4j/api/src/main/java/org/apache/plc4x/java/api/Experimental.java
index 5e3946b..8d2d8d3 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/Changed.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/Experimental.java
@@ -22,5 +22,5 @@ package org.apache.plc4x.java.api;
 /**
  * Indicates that this is a recent API Change.
  */
-public @interface Changed {
+public @interface Experimental {
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnectionExtension.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnectionExtension.java
new file mode 100644
index 0000000..5f36f24
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnectionExtension.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.api;
+
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.util.concurrent.Future;
+
+/**
+ * Suggestion for new API
+ */
+@Experimental
+public interface PlcConnectionExtension {
+
+    /**
+     * <code>
+     *     "%DB400.xxx"
+     * </code>
+     * <code>
+     *     ["%DB400.xxx", ""]
+     * </code>
+     * <code>
+     *     { "item1": "%DB400.xxx", "item2": "xx" }
+     * </code>
+     * Prepared Statement
+     * <code>
+     *     { "item1": ?, "item2": ? }
+     * </code>
+     *
+     * @param s
+     * @return
+     */
+    @Experimental
+    default Future<NewPlcResponse> query(String s, Object... args) {
+        throw new NotImplementedException();
+    }
+
+    /**
+     * <code>
+     *     define { "item1": "%DB400.xxx", "item2": "xx" } AS my_pymél_struct
+     * </code>
+     * <code>
+     *     define %DN4ßß" AS "my_structure"
+     * </code>
+     * @param pql
+     * @return
+     */
+    @Experimental
+    default Future<Boolean> execute(String pql) {
+        throw new NotImplementedException();
+    }
+
+    /**
+     * Planned successor for
+     * {@link org.apache.plc4x.java.api.messages.PlcResponse}
+     * {@link org.apache.plc4x.java.api.messages.PlcReadResponse}
+     * {@link org.apache.plc4x.java.api.messages.PlcWriteResponse}
+     * {@link org.apache.plc4x.java.api.messages.PlcSubscriptionResponse}
+     * {@link org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse}
+     */
+    @Experimental
+    interface NewPlcResponse {
+
+    }
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
index 0ab3547..0e56abd 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
@@ -18,7 +18,7 @@
  */
 package org.apache.plc4x.java.api.messages;
 
-import org.apache.plc4x.java.api.Changed;
+import org.apache.plc4x.java.api.Experimental;
 import org.apache.plc4x.java.api.value.PlcValue;
 
 import java.math.BigDecimal;
@@ -36,7 +36,7 @@ public interface PlcReadResponse extends PlcFieldResponse {
     @Override
     PlcReadRequest getRequest();
 
-    @Changed
+    @Experimental
     PlcValue getAsPlcValue();
 
     int getNumberOfValues(String name);
diff --git a/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java b/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
index 8f8b23e..972550b 100644
--- a/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
+++ b/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
@@ -28,6 +28,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Future;
 
 public class HelloPlc4x {
 
@@ -79,7 +81,7 @@ public class HelloPlc4x {
             // Read asynchronously ...
             // Register a callback executed as soon as a response arrives.
             logger.info("Asynchronous request ...");
-            CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
+            CompletionStage<? extends PlcReadResponse> asyncResponse = readRequest.execute();
             asyncResponse.whenComplete((readResponse, throwable) -> {
                 if (readResponse != null) {
                     printResponse(readResponse);
diff --git a/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml b/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml
index 27d40c0..a8ddebb 100644
--- a/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml
+++ b/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml
@@ -29,7 +29,7 @@
     </encoder>
   </appender>
 
-  <root level="info">
+  <root level="trace">
     <appender-ref ref="STDOUT" />
   </root>
 
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index 7f21126..37f2145 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -20,6 +20,7 @@
 package org.apache.plc4x.java.spi;
 
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.MessageToMessageCodec;
 import io.vavr.control.Either;
 import org.apache.plc4x.java.spi.events.ConnectEvent;
@@ -40,29 +41,50 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
-public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, PlcRequestContainer> {
+public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
 
     private static final Logger logger = LoggerFactory.getLogger(Plc4xNettyWrapper.class);
 
     private final Plc4xProtocolBase<T> protocolBase;
     private final Queue<HandlerRegistration> registeredHandlers;
+    private final ChannelPipeline pipeline;
 
-    public Plc4xNettyWrapper(Plc4xProtocolBase<T> parent, Class<T> clazz) {
-        super(clazz, PlcRequestContainer.class);
+    public Plc4xNettyWrapper(ChannelPipeline pipeline, Plc4xProtocolBase<T> protocol, Class<T> clazz) {
+        super(clazz, Object.class);
+        this.pipeline = pipeline;
         this.registeredHandlers = new ConcurrentLinkedQueue<>();
-        this.protocolBase = parent;
+        this.protocolBase = protocol;
+        this.protocolBase.setContext(new ConversationContext<T>() {
+            @Override public void sendToWire(T msg) {
+                pipeline.writeAndFlush(msg);
+            }
+
+            @Override public void fireConnected() {
+                pipeline.fireUserEventTriggered(ConnectedEvent.class);
+            }
+
+            @Override public SendRequestContext<T> sendRequest(T packet) {
+                return new DefaultSendRequestContext<T>(handler -> {
+                    logger.trace("Adding Response Handler...");
+                    registeredHandlers.add(handler);
+                }, packet, this);
+            }
+        });
     }
 
     @Override
-    protected void encode(ChannelHandlerContext channelHandlerContext, PlcRequestContainer plcRequestContainer, List<Object> list) throws Exception {
-        logger.trace("Encoding {}", plcRequestContainer);
-        protocolBase.encode(new DefaultConversationContext<T>(channelHandlerContext) {
-            @Override
-            public void sendToWire(T msg) {
-                logger.trace("Sending to wire {}", msg);
-                list.add(msg);
-            }
-        }, plcRequestContainer);
+    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List<Object> list) throws Exception {
+//        logger.trace("Encoding {}", plcRequestContainer);
+//        protocolBase.encode(new DefaultConversationContext<T>(channelHandlerContext) {
+//            @Override
+//            public void sendToWire(T msg) {
+//                logger.trace("Sending to wire {}", msg);
+//                list.add(msg);
+//            }
+//        }, plcRequestContainer);
+        // NOOP
+        logger.info("Forwarding request to plc {}", msg);
+        list.add(msg);
     }
 
     @Override
@@ -145,7 +167,7 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, PlcRequestCon
             return new DefaultSendRequestContext<T1>(handler -> {
                 logger.trace("Adding Response Handler...");
                 registeredHandlers.add(handler);
-            }, packet, (DefaultConversationContext)this);
+            }, packet, this);
         }
     }
 }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
index f8fd0a2..24f51f6 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
@@ -19,27 +19,35 @@
 
 package org.apache.plc4x.java.spi;
 
-import org.apache.plc4x.java.spi.messages.PlcRequestContainer;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
 public abstract class Plc4xProtocolBase<T> {
 
+    protected ConversationContext<T> context;
+
+    public void setContext(ConversationContext<T> context) {
+        this.context = context;
+    }
+
     public void onConnect(ConversationContext<T> context) {
         // Intentionally do nothing here
     }
 
     /**
-     * @param context
-     * @param msg
-     * @throws Exception
-     * @deprecated will be replaced by direct calls
-     */
-    @Deprecated
-    protected abstract void encode(ConversationContext<T> context, PlcRequestContainer msg) throws Exception;
-
-    /**
      * TODO document me
      * <p>
-     * Can be used for non request incoming messages
+     * Can be used for non requested incoming messages
      *
      * @param context
      * @param msg
@@ -48,4 +56,19 @@ public abstract class Plc4xProtocolBase<T> {
     protected void decode(ConversationContext<T> context, T msg) throws Exception {
     }
 
+    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
+        throw new NotImplementedException("");
+    }
+
+    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+        throw new NotImplementedException("");
+    }
+
+    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+        throw new NotImplementedException("");
+    }
+
+    public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+        throw new NotImplementedException("");
+    }
 }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java
index 9c0a89a..4b3d2ac 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java
@@ -21,14 +21,32 @@ package org.apache.plc4x.java.spi.connection;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcUnsupportedOperationException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.metadata.PlcConnectionMetadata;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.spi.Plc4xProtocolBase;
+import org.apache.plc4x.java.spi.messages.DefaultPlcReadRequest;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionRequest;
+import org.apache.plc4x.java.spi.messages.DefaultPlcUnsubscriptionRequest;
+import org.apache.plc4x.java.spi.messages.DefaultPlcWriteRequest;
 import org.apache.plc4x.java.spi.messages.InternalPlcMessage;
+import org.apache.plc4x.java.spi.messages.PlcReader;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
+import org.apache.plc4x.java.spi.messages.PlcWriter;
 
+import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
 
 /**
  * Base class for implementing connections.
@@ -36,7 +54,31 @@ import java.util.concurrent.CompletableFuture;
  * Concrete implementations should override the methods indicating connection capabilities
  * and for obtaining respective request builders.
  */
-public abstract class AbstractPlcConnection implements PlcConnection, PlcConnectionMetadata {
+public abstract class AbstractPlcConnection implements PlcConnection, PlcConnectionMetadata, PlcReader, PlcWriter , PlcSubscriber {
+
+    private boolean canRead = false;
+    private boolean canWrite = false;
+    private boolean canSubscribe = false;
+    private PlcFieldHandler fieldHandler;
+    private Plc4xProtocolBase<?> protocol;
+
+    /**
+     * @deprecated only for compatibility reasons.
+     */
+    @Deprecated
+    public AbstractPlcConnection() {
+    }
+
+    public AbstractPlcConnection(boolean canRead, boolean canWrite, boolean canSubscribe, PlcFieldHandler fieldHandler) {
+        this.canRead = canRead;
+        this.canWrite = canWrite;
+        this.canSubscribe = canSubscribe;
+        this.fieldHandler = fieldHandler;
+    }
+
+    public void setProtocol(Plc4xProtocolBase<?> protocol) {
+        this.protocol = protocol;
+    }
 
     @Override
     public PlcConnectionMetadata getMetadata() {
@@ -52,37 +94,77 @@ public abstract class AbstractPlcConnection implements PlcConnection, PlcConnect
 
     @Override
     public boolean canRead() {
-        return false;
+        return canRead;
     }
 
     @Override
     public boolean canWrite() {
-        return false;
+        return canWrite;
     }
 
     @Override
     public boolean canSubscribe() {
-        return false;
+        return canSubscribe;
+    }
+
+    public PlcFieldHandler getPlcFieldHandler() {
+        return this.fieldHandler;
     }
 
     @Override
     public PlcReadRequest.Builder readRequestBuilder() {
-        throw new PlcUnsupportedOperationException("The connection does not support reading");
+        if (!canRead()) {
+            throw new PlcUnsupportedOperationException("The connection does not support reading");
+        }
+        return new DefaultPlcReadRequest.Builder(this, getPlcFieldHandler());
     }
 
     @Override
     public PlcWriteRequest.Builder writeRequestBuilder() {
-        throw new PlcUnsupportedOperationException("The connection does not support writing");
+        if (!canWrite()) {
+            throw new PlcUnsupportedOperationException("The connection does not support writing");
+        }
+        return new DefaultPlcWriteRequest.Builder(this, getPlcFieldHandler());
     }
 
     @Override
     public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() {
-        throw new PlcUnsupportedOperationException("The connection does not support subscription");
+        if (!canSubscribe()) {
+            throw new PlcUnsupportedOperationException("The connection does not support subscription");
+        }
+        return new DefaultPlcSubscriptionRequest.Builder(this, getPlcFieldHandler());
     }
 
     @Override
     public PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder() {
-        throw new PlcUnsupportedOperationException("The connection does not support subscription");
+        if (!canSubscribe) {
+            throw new PlcUnsupportedOperationException("The connection does not support subscription");
+        }
+        return new DefaultPlcUnsubscriptionRequest.Builder(this);
+    }
+
+    @Override public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
+        return protocol.read(readRequest);
+    }
+
+    @Override public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+        return protocol.write(writeRequest);
+    }
+
+    @Override public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+        return protocol.subscribe(subscriptionRequest);
+    }
+
+    @Override public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+        return protocol.unsubscribe(unsubscriptionRequest);
+    }
+
+    @Override public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
+        return null;
+    }
+
+    @Override public void unregister(PlcConsumerRegistration registration) {
+
     }
 
     /**
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java
index 4a481f0..747b725 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java
@@ -25,6 +25,7 @@ import io.netty.util.Timer;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.exceptions.PlcIoException;
+import org.apache.plc4x.java.spi.Plc4xProtocolBase;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -55,6 +56,13 @@ public abstract class NettyPlcConnection extends AbstractPlcConnection {
         this.connected = false;
     }
 
+    protected NettyPlcConnection(ChannelFactory channelFactory, boolean awaitSessionSetupComplete, PlcFieldHandler handler) {
+        super(true, true, false, handler);
+        this.channelFactory = channelFactory;
+        this.awaitSessionSetupComplete = awaitSessionSetupComplete;
+        this.connected = false;
+    }
+
     @Override
     public void connect() throws PlcConnectionException {
         try {
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
index 45bc4fb..0755259 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
@@ -42,7 +42,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
     private final Object request;
 
     @SuppressWarnings("unchecked")
-    private final Plc4xNettyWrapper.DefaultConversationContext context;
+    private final ConversationContext context;
 
     protected Class<?> expectClazz;
 
@@ -54,13 +54,13 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
 
     protected Duration timeout;
 
-    public DefaultSendRequestContext(Consumer<HandlerRegistration> finisher, T request, Plc4xNettyWrapper<T>.DefaultConversationContext<T> context) {
+    public DefaultSendRequestContext(Consumer<HandlerRegistration> finisher, T request, ConversationContext<T> context) {
         this.finisher = finisher;
         this.request = request;
         this.context = context;
     }
 
-    protected DefaultSendRequestContext(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Duration timeout, Consumer<HandlerRegistration> finisher, Object request, Plc4xNettyWrapper<?>.DefaultConversationContext<?> context, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
+    protected DefaultSendRequestContext(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Duration timeout, Consumer<HandlerRegistration> finisher, Object request, ConversationContext<?> context, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
         this.commands = commands;
         this.timeout = timeout;
         this.finisher = finisher;
diff --git a/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java b/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java
index 7282609..37cf142 100644
--- a/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java
+++ b/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java
@@ -45,10 +45,12 @@ public abstract class GeneratedDriverByteToMessageCodec<T extends Message> exten
         WriteBuffer buffer = new WriteBuffer(packet.getLengthInBytes());
         io.serialize(buffer, packet);
         byteBuf.writeBytes(buffer.getData());
+        logger.debug("Sending bytes to PLC for message {} as data {}", packet, Hex.encodeHexString(buffer.getData()));
     }
 
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
+        logger.trace("Receiving bytes, trying to decode Message...");
         // As long as there is data available, continue checking the content.
         while(byteBuf.readableBytes() > 0) {
             // Check if enough data is present to process the entire package.
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
index 7677342..dfbbcec 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
@@ -72,7 +72,7 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
     }
 
     public S7Connection(ChannelFactory channelFactory, String params) {
-        super(channelFactory, true);
+        super(channelFactory, true, new S7PlcFieldHandler());
 
         short curRack = 1;
         short curSlot = 1;
@@ -167,7 +167,8 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
                 pipeline.addLast(new S7Protocol());
                 Plc4xProtocolBase<TPKTPacket> plc4xS7Protocol = new Plc4xS7Protocol(callingTsapId, calledTsapId, tpduSize,
                     maxAmqCaller, maxAmqCallee, controllerType);
-                Plc4xNettyWrapper<TPKTPacket> context = new Plc4xNettyWrapper<>(plc4xS7Protocol, TPKTPacket.class);
+                setProtocol(plc4xS7Protocol);
+                Plc4xNettyWrapper<TPKTPacket> context = new Plc4xNettyWrapper<>(pipeline, plc4xS7Protocol, TPKTPacket.class);
                 pipeline.addLast(context);
             }
         };
@@ -184,21 +185,6 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
     }
 
     @Override
-    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
-        InternalPlcReadRequest internalReadRequest = checkInternal(readRequest, InternalPlcReadRequest.class);
-        CompletableFuture<InternalPlcReadResponse> future = new CompletableFuture<>();
-        PlcRequestContainer<InternalPlcReadRequest, InternalPlcReadResponse> container =
-            new PlcRequestContainer<>(internalReadRequest, future);
-        channel.writeAndFlush(container).addListener(f -> {
-            if (!f.isSuccess()) {
-                future.completeExceptionally(f.cause());
-            }
-        });
-        return future
-            .thenApply(PlcReadResponse.class::cast);
-    }
-
-    @Override
     public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         InternalPlcWriteRequest internalWriteRequest = checkInternal(writeRequest, InternalPlcWriteRequest.class);
         CompletableFuture<InternalPlcWriteResponse> future = new CompletableFuture<>();
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
index 9c0e3fd..4b604e0 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
@@ -25,7 +25,15 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.messages.PlcResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.s7.readwrite.*;
@@ -50,6 +58,8 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.temporal.ChronoUnit;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -128,6 +138,40 @@ public class Plc4xS7Protocol extends Plc4xProtocolBase<TPKTPacket> {
             });
     }
 
+    @Override public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
+        CompletableFuture<PlcReadResponse> future = new CompletableFuture<>();
+        DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest;
+        List<S7VarRequestParameterItem> requestItems = new ArrayList<>(request.getNumberOfFields());
+        for (PlcField field : request.getFields()) {
+            requestItems.add(new S7VarRequestParameterItemAddress(toS7Address(field)));
+        }
+        final int tpduId = tpduGenerator.getAndIncrement();
+        TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null,
+            new S7MessageRequest(tpduId,
+                new S7ParameterReadVarRequest(requestItems.toArray(new S7VarRequestParameterItem[0])),
+                new S7PayloadReadVarRequest()),
+            true, (short) tpduId));
+
+        context.sendRequest(tpktPacket)
+            .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
+            .onTimeout(future::completeExceptionally)
+            .onError((p, e) -> future.completeExceptionally(e))
+            .check(p -> p.getPayload() instanceof COTPPacketData)
+            .unwrap(p -> ((COTPPacketData) p.getPayload()))
+            .check(p -> p.getPayload() instanceof S7MessageResponse)
+            .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+            .check(p -> p.getTpduReference() == tpduId)
+            .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
+            .handle(p -> {
+                try {
+                    future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
+                } catch (PlcProtocolException e) {
+                    e.printStackTrace();
+                }
+            });
+        return future;
+    }
+
     private void extractControllerTypeAndFireConnected(ConversationContext<TPKTPacket> context, S7PayloadUserData payloadUserData) {
         for (S7PayloadUserDataItem item : payloadUserData.getItems()) {
             if (!(item instanceof S7PayloadUserDataItemCpuFunctionReadSzlResponse)) {
@@ -191,45 +235,7 @@ public class Plc4xS7Protocol extends Plc4xProtocolBase<TPKTPacket> {
             }, null, (short) 0x0000, (short) 0x000F, COTPProtocolClass.CLASS_0);
     }
 
-    @Override
-    protected void encode(ConversationContext<TPKTPacket> context, PlcRequestContainer msg) throws Exception {
-        if (msg.getRequest() instanceof DefaultPlcReadRequest) {
-            DefaultPlcReadRequest request = (DefaultPlcReadRequest) msg.getRequest();
-            List<S7VarRequestParameterItem> requestItems = new ArrayList<>(request.getNumberOfFields());
-            for (PlcField field : request.getFields()) {
-                requestItems.add(new S7VarRequestParameterItemAddress(toS7Address(field)));
-            }
-            final int tpduId = tpduGenerator.getAndIncrement();
-            TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null,
-                new S7MessageRequest(tpduId,
-                    new S7ParameterReadVarRequest(requestItems.toArray(new S7VarRequestParameterItem[0])),
-                    new S7PayloadReadVarRequest()),
-                true, (short) tpduId));
-
-            context.sendRequest(tpktPacket)
-                .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
-                // TODO: make it really optional
-                .check(p -> p.getPayload() instanceof COTPPacketData)
-                .unwrap(p -> ((COTPPacketData) p.getPayload()))
-                .check(p -> p.getPayload() instanceof S7MessageResponse)
-                .unwrap(p -> ((S7MessageResponse) p.getPayload()))
-                .check(p -> p.getTpduReference() == tpduId)
-                .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
-                .handle(p -> {
-                    try {
-                        msg.getResponseFuture().complete(decodeReadResponse(p, msg));
-                    } catch (PlcProtocolException e) {
-                        e.printStackTrace();
-                    }
-                });
-        } else {
-            throw new NotImplementedException("Implement me");
-        }
-    }
-
-    private PlcResponse decodeReadResponse(S7MessageResponse responseMessage, PlcRequestContainer requestContainer) throws PlcProtocolException {
-        InternalPlcReadRequest plcReadRequest = (InternalPlcReadRequest) requestContainer.getRequest();
-
+    private PlcResponse decodeReadResponse(S7MessageResponse responseMessage, InternalPlcReadRequest plcReadRequest) throws PlcProtocolException {
         S7PayloadReadVarResponse payload = (S7PayloadReadVarResponse) responseMessage.getPayload();
 
         // If the numbers of items don't match, we're in big trouble as the only