You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/12/19 10:53:40 UTC

[plc4x] branch next-gen-core updated (4681d64 -> e1223ff)

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a change to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git.


 discard 4681d64  Added Experimental Annotation. - Implemented default methods for read, write, subscribe, ... - Direct communication from connection to protocol
     new e1223ff  Added Experimental Annotation. - Implemented default methods for read, write, subscribe, ... - Direct communication from connection to protocol

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (4681d64)
            \
             N -- N -- N   refs/heads/next-gen-core (e1223ff)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../s7/readwrite/protocol/Plc4xS7Protocol.java     | 71 ++++++++++++++++++----
 .../java/s7/readwrite/protocol/S7Protocol.java     |  4 +-
 2 files changed, 60 insertions(+), 15 deletions(-)


[plc4x] 01/01: Added Experimental Annotation. - Implemented default methods for read, write, subscribe, ... - Direct communication from connection to protocol

Posted by jf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit e1223ff96dac1f38016a81e6702e07d18c9c9204
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Thu Dec 19 11:51:36 2019 +0100

    Added Experimental Annotation.
    - Implemented default methods for read, write, subscribe, ...
    - Direct communication from connection to protocol
---
 .../org/apache/plc4x/java/PlcDriverManager.java    |   4 +-
 .../java/api/{Changed.java => Experimental.java}   |   2 +-
 .../plc4x/java/api/PlcConnectionExtension.java     |  83 ++++++++++++
 .../plc4x/java/api/messages/PlcReadResponse.java   |   4 +-
 .../plc4x/java/examples/helloplc4x/HelloPlc4x.java |   4 +-
 .../src/main/resources/logback.xml                 |   2 +-
 .../apache/plc4x/java/spi/Plc4xNettyWrapper.java   |  50 ++++++--
 .../apache/plc4x/java/spi/Plc4xProtocolBase.java   |  45 +++++--
 .../java/spi/connection/AbstractPlcConnection.java |  98 ++++++++++++--
 .../java/spi/connection/NettyPlcConnection.java    |   8 ++
 .../spi/internal/DefaultSendRequestContext.java    |   6 +-
 .../spi/GeneratedDriverByteToMessageCodec.java     |   2 +
 .../java/s7/readwrite/connection/S7Connection.java |  20 +--
 .../s7/readwrite/protocol/Plc4xS7Protocol.java     | 141 ++++++++++++++-------
 .../java/s7/readwrite/protocol/S7Protocol.java     |   4 +-
 15 files changed, 366 insertions(+), 107 deletions(-)

diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java b/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
index 710cdb1..13efee1 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
@@ -18,7 +18,7 @@
  */
 package org.apache.plc4x.java;
 
-import org.apache.plc4x.java.api.Changed;
+import org.apache.plc4x.java.api.Experimental;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -96,7 +96,7 @@ public class PlcDriverManager {
      * @return Driver instance for the given protocol
      * @throws PlcConnectionException If no Suitable Driver can be found
      */
-    @Changed
+    @Experimental
     public PlcDriver getDriver(String url) throws PlcConnectionException {
         try {
             URI connectionUri = new URI(url);
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/Changed.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/Experimental.java
similarity index 96%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/Changed.java
rename to plc4j/api/src/main/java/org/apache/plc4x/java/api/Experimental.java
index 5e3946b..8d2d8d3 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/Changed.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/Experimental.java
@@ -22,5 +22,5 @@ package org.apache.plc4x.java.api;
 /**
  * Indicates that this is a recent API Change.
  */
-public @interface Changed {
+public @interface Experimental {
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnectionExtension.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnectionExtension.java
new file mode 100644
index 0000000..5f36f24
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnectionExtension.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.api;
+
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.util.concurrent.Future;
+
+/**
+ * Suggestion for new API
+ */
+@Experimental
+public interface PlcConnectionExtension {
+
+    /**
+     * <code>
+     *     "%DB400.xxx"
+     * </code>
+     * <code>
+     *     ["%DB400.xxx", ""]
+     * </code>
+     * <code>
+     *     { "item1": "%DB400.xxx", "item2": "xx" }
+     * </code>
+     * Prepared Statement
+     * <code>
+     *     { "item1": ?, "item2": ? }
+     * </code>
+     *
+     * @param s
+     * @return
+     */
+    @Experimental
+    default Future<NewPlcResponse> query(String s, Object... args) {
+        throw new NotImplementedException();
+    }
+
+    /**
+     * <code>
+     *     define { "item1": "%DB400.xxx", "item2": "xx" } AS my_pymél_struct
+     * </code>
+     * <code>
+     *     define %DN4ßß" AS "my_structure"
+     * </code>
+     * @param pql
+     * @return
+     */
+    @Experimental
+    default Future<Boolean> execute(String pql) {
+        throw new NotImplementedException();
+    }
+
+    /**
+     * Planned successor for
+     * {@link org.apache.plc4x.java.api.messages.PlcResponse}
+     * {@link org.apache.plc4x.java.api.messages.PlcReadResponse}
+     * {@link org.apache.plc4x.java.api.messages.PlcWriteResponse}
+     * {@link org.apache.plc4x.java.api.messages.PlcSubscriptionResponse}
+     * {@link org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse}
+     */
+    @Experimental
+    interface NewPlcResponse {
+
+    }
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
index 0ab3547..0e56abd 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
@@ -18,7 +18,7 @@
  */
 package org.apache.plc4x.java.api.messages;
 
-import org.apache.plc4x.java.api.Changed;
+import org.apache.plc4x.java.api.Experimental;
 import org.apache.plc4x.java.api.value.PlcValue;
 
 import java.math.BigDecimal;
@@ -36,7 +36,7 @@ public interface PlcReadResponse extends PlcFieldResponse {
     @Override
     PlcReadRequest getRequest();
 
-    @Changed
+    @Experimental
     PlcValue getAsPlcValue();
 
     int getNumberOfValues(String name);
diff --git a/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java b/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
index 8f8b23e..972550b 100644
--- a/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
+++ b/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
@@ -28,6 +28,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Future;
 
 public class HelloPlc4x {
 
@@ -79,7 +81,7 @@ public class HelloPlc4x {
             // Read asynchronously ...
             // Register a callback executed as soon as a response arrives.
             logger.info("Asynchronous request ...");
-            CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
+            CompletionStage<? extends PlcReadResponse> asyncResponse = readRequest.execute();
             asyncResponse.whenComplete((readResponse, throwable) -> {
                 if (readResponse != null) {
                     printResponse(readResponse);
diff --git a/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml b/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml
index 27d40c0..a8ddebb 100644
--- a/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml
+++ b/plc4j/examples/hello-world-plc4x/src/main/resources/logback.xml
@@ -29,7 +29,7 @@
     </encoder>
   </appender>
 
-  <root level="info">
+  <root level="trace">
     <appender-ref ref="STDOUT" />
   </root>
 
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index 7f21126..37f2145 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -20,6 +20,7 @@
 package org.apache.plc4x.java.spi;
 
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.MessageToMessageCodec;
 import io.vavr.control.Either;
 import org.apache.plc4x.java.spi.events.ConnectEvent;
@@ -40,29 +41,50 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
-public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, PlcRequestContainer> {
+public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
 
     private static final Logger logger = LoggerFactory.getLogger(Plc4xNettyWrapper.class);
 
     private final Plc4xProtocolBase<T> protocolBase;
     private final Queue<HandlerRegistration> registeredHandlers;
+    private final ChannelPipeline pipeline;
 
-    public Plc4xNettyWrapper(Plc4xProtocolBase<T> parent, Class<T> clazz) {
-        super(clazz, PlcRequestContainer.class);
+    public Plc4xNettyWrapper(ChannelPipeline pipeline, Plc4xProtocolBase<T> protocol, Class<T> clazz) {
+        super(clazz, Object.class);
+        this.pipeline = pipeline;
         this.registeredHandlers = new ConcurrentLinkedQueue<>();
-        this.protocolBase = parent;
+        this.protocolBase = protocol;
+        this.protocolBase.setContext(new ConversationContext<T>() {
+            @Override public void sendToWire(T msg) {
+                pipeline.writeAndFlush(msg);
+            }
+
+            @Override public void fireConnected() {
+                pipeline.fireUserEventTriggered(ConnectedEvent.class);
+            }
+
+            @Override public SendRequestContext<T> sendRequest(T packet) {
+                return new DefaultSendRequestContext<T>(handler -> {
+                    logger.trace("Adding Response Handler...");
+                    registeredHandlers.add(handler);
+                }, packet, this);
+            }
+        });
     }
 
     @Override
-    protected void encode(ChannelHandlerContext channelHandlerContext, PlcRequestContainer plcRequestContainer, List<Object> list) throws Exception {
-        logger.trace("Encoding {}", plcRequestContainer);
-        protocolBase.encode(new DefaultConversationContext<T>(channelHandlerContext) {
-            @Override
-            public void sendToWire(T msg) {
-                logger.trace("Sending to wire {}", msg);
-                list.add(msg);
-            }
-        }, plcRequestContainer);
+    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List<Object> list) throws Exception {
+//        logger.trace("Encoding {}", plcRequestContainer);
+//        protocolBase.encode(new DefaultConversationContext<T>(channelHandlerContext) {
+//            @Override
+//            public void sendToWire(T msg) {
+//                logger.trace("Sending to wire {}", msg);
+//                list.add(msg);
+//            }
+//        }, plcRequestContainer);
+        // NOOP
+        logger.info("Forwarding request to plc {}", msg);
+        list.add(msg);
     }
 
     @Override
@@ -145,7 +167,7 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, PlcRequestCon
             return new DefaultSendRequestContext<T1>(handler -> {
                 logger.trace("Adding Response Handler...");
                 registeredHandlers.add(handler);
-            }, packet, (DefaultConversationContext)this);
+            }, packet, this);
         }
     }
 }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
index f8fd0a2..24f51f6 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
@@ -19,27 +19,35 @@
 
 package org.apache.plc4x.java.spi;
 
-import org.apache.plc4x.java.spi.messages.PlcRequestContainer;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
 public abstract class Plc4xProtocolBase<T> {
 
+    protected ConversationContext<T> context;
+
+    public void setContext(ConversationContext<T> context) {
+        this.context = context;
+    }
+
     public void onConnect(ConversationContext<T> context) {
         // Intentionally do nothing here
     }
 
     /**
-     * @param context
-     * @param msg
-     * @throws Exception
-     * @deprecated will be replaced by direct calls
-     */
-    @Deprecated
-    protected abstract void encode(ConversationContext<T> context, PlcRequestContainer msg) throws Exception;
-
-    /**
      * TODO document me
      * <p>
-     * Can be used for non request incoming messages
+     * Can be used for non requested incoming messages
      *
      * @param context
      * @param msg
@@ -48,4 +56,19 @@ public abstract class Plc4xProtocolBase<T> {
     protected void decode(ConversationContext<T> context, T msg) throws Exception {
     }
 
+    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
+        throw new NotImplementedException("");
+    }
+
+    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+        throw new NotImplementedException("");
+    }
+
+    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+        throw new NotImplementedException("");
+    }
+
+    public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+        throw new NotImplementedException("");
+    }
 }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java
index 9c0a89a..4b3d2ac 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/AbstractPlcConnection.java
@@ -21,14 +21,32 @@ package org.apache.plc4x.java.spi.connection;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcUnsupportedOperationException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.metadata.PlcConnectionMetadata;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.spi.Plc4xProtocolBase;
+import org.apache.plc4x.java.spi.messages.DefaultPlcReadRequest;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionRequest;
+import org.apache.plc4x.java.spi.messages.DefaultPlcUnsubscriptionRequest;
+import org.apache.plc4x.java.spi.messages.DefaultPlcWriteRequest;
 import org.apache.plc4x.java.spi.messages.InternalPlcMessage;
+import org.apache.plc4x.java.spi.messages.PlcReader;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
+import org.apache.plc4x.java.spi.messages.PlcWriter;
 
+import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
 
 /**
  * Base class for implementing connections.
@@ -36,7 +54,31 @@ import java.util.concurrent.CompletableFuture;
  * Concrete implementations should override the methods indicating connection capabilities
  * and for obtaining respective request builders.
  */
-public abstract class AbstractPlcConnection implements PlcConnection, PlcConnectionMetadata {
+public abstract class AbstractPlcConnection implements PlcConnection, PlcConnectionMetadata, PlcReader, PlcWriter , PlcSubscriber {
+
+    private boolean canRead = false;
+    private boolean canWrite = false;
+    private boolean canSubscribe = false;
+    private PlcFieldHandler fieldHandler;
+    private Plc4xProtocolBase<?> protocol;
+
+    /**
+     * @deprecated only for compatibility reasons.
+     */
+    @Deprecated
+    public AbstractPlcConnection() {
+    }
+
+    public AbstractPlcConnection(boolean canRead, boolean canWrite, boolean canSubscribe, PlcFieldHandler fieldHandler) {
+        this.canRead = canRead;
+        this.canWrite = canWrite;
+        this.canSubscribe = canSubscribe;
+        this.fieldHandler = fieldHandler;
+    }
+
+    public void setProtocol(Plc4xProtocolBase<?> protocol) {
+        this.protocol = protocol;
+    }
 
     @Override
     public PlcConnectionMetadata getMetadata() {
@@ -52,37 +94,77 @@ public abstract class AbstractPlcConnection implements PlcConnection, PlcConnect
 
     @Override
     public boolean canRead() {
-        return false;
+        return canRead;
     }
 
     @Override
     public boolean canWrite() {
-        return false;
+        return canWrite;
     }
 
     @Override
     public boolean canSubscribe() {
-        return false;
+        return canSubscribe;
+    }
+
+    public PlcFieldHandler getPlcFieldHandler() {
+        return this.fieldHandler;
     }
 
     @Override
     public PlcReadRequest.Builder readRequestBuilder() {
-        throw new PlcUnsupportedOperationException("The connection does not support reading");
+        if (!canRead()) {
+            throw new PlcUnsupportedOperationException("The connection does not support reading");
+        }
+        return new DefaultPlcReadRequest.Builder(this, getPlcFieldHandler());
     }
 
     @Override
     public PlcWriteRequest.Builder writeRequestBuilder() {
-        throw new PlcUnsupportedOperationException("The connection does not support writing");
+        if (!canWrite()) {
+            throw new PlcUnsupportedOperationException("The connection does not support writing");
+        }
+        return new DefaultPlcWriteRequest.Builder(this, getPlcFieldHandler());
     }
 
     @Override
     public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() {
-        throw new PlcUnsupportedOperationException("The connection does not support subscription");
+        if (!canSubscribe()) {
+            throw new PlcUnsupportedOperationException("The connection does not support subscription");
+        }
+        return new DefaultPlcSubscriptionRequest.Builder(this, getPlcFieldHandler());
     }
 
     @Override
     public PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder() {
-        throw new PlcUnsupportedOperationException("The connection does not support subscription");
+        if (!canSubscribe) {
+            throw new PlcUnsupportedOperationException("The connection does not support subscription");
+        }
+        return new DefaultPlcUnsubscriptionRequest.Builder(this);
+    }
+
+    @Override public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
+        return protocol.read(readRequest);
+    }
+
+    @Override public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+        return protocol.write(writeRequest);
+    }
+
+    @Override public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+        return protocol.subscribe(subscriptionRequest);
+    }
+
+    @Override public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+        return protocol.unsubscribe(unsubscriptionRequest);
+    }
+
+    @Override public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
+        return null;
+    }
+
+    @Override public void unregister(PlcConsumerRegistration registration) {
+
     }
 
     /**
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java
index 4a481f0..747b725 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyPlcConnection.java
@@ -25,6 +25,7 @@ import io.netty.util.Timer;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.exceptions.PlcIoException;
+import org.apache.plc4x.java.spi.Plc4xProtocolBase;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -55,6 +56,13 @@ public abstract class NettyPlcConnection extends AbstractPlcConnection {
         this.connected = false;
     }
 
+    protected NettyPlcConnection(ChannelFactory channelFactory, boolean awaitSessionSetupComplete, PlcFieldHandler handler) {
+        super(true, true, false, handler);
+        this.channelFactory = channelFactory;
+        this.awaitSessionSetupComplete = awaitSessionSetupComplete;
+        this.connected = false;
+    }
+
     @Override
     public void connect() throws PlcConnectionException {
         try {
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
index 45bc4fb..0755259 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
@@ -42,7 +42,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
     private final Object request;
 
     @SuppressWarnings("unchecked")
-    private final Plc4xNettyWrapper.DefaultConversationContext context;
+    private final ConversationContext context;
 
     protected Class<?> expectClazz;
 
@@ -54,13 +54,13 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
 
     protected Duration timeout;
 
-    public DefaultSendRequestContext(Consumer<HandlerRegistration> finisher, T request, Plc4xNettyWrapper<T>.DefaultConversationContext<T> context) {
+    public DefaultSendRequestContext(Consumer<HandlerRegistration> finisher, T request, ConversationContext<T> context) {
         this.finisher = finisher;
         this.request = request;
         this.context = context;
     }
 
-    protected DefaultSendRequestContext(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Duration timeout, Consumer<HandlerRegistration> finisher, Object request, Plc4xNettyWrapper<?>.DefaultConversationContext<?> context, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
+    protected DefaultSendRequestContext(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Duration timeout, Consumer<HandlerRegistration> finisher, Object request, ConversationContext<?> context, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
         this.commands = commands;
         this.timeout = timeout;
         this.finisher = finisher;
diff --git a/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java b/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java
index 7282609..37cf142 100644
--- a/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java
+++ b/plc4j/utils/driver-base-java/src/main/java/org/apache/plc4x/java/spi/GeneratedDriverByteToMessageCodec.java
@@ -45,10 +45,12 @@ public abstract class GeneratedDriverByteToMessageCodec<T extends Message> exten
         WriteBuffer buffer = new WriteBuffer(packet.getLengthInBytes());
         io.serialize(buffer, packet);
         byteBuf.writeBytes(buffer.getData());
+        logger.debug("Sending bytes to PLC for message {} as data {}", packet, Hex.encodeHexString(buffer.getData()));
     }
 
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
+        logger.trace("Receiving bytes, trying to decode Message...");
         // As long as there is data available, continue checking the content.
         while(byteBuf.readableBytes() > 0) {
             // Check if enough data is present to process the entire package.
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
index 7677342..dfbbcec 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
@@ -72,7 +72,7 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
     }
 
     public S7Connection(ChannelFactory channelFactory, String params) {
-        super(channelFactory, true);
+        super(channelFactory, true, new S7PlcFieldHandler());
 
         short curRack = 1;
         short curSlot = 1;
@@ -167,7 +167,8 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
                 pipeline.addLast(new S7Protocol());
                 Plc4xProtocolBase<TPKTPacket> plc4xS7Protocol = new Plc4xS7Protocol(callingTsapId, calledTsapId, tpduSize,
                     maxAmqCaller, maxAmqCallee, controllerType);
-                Plc4xNettyWrapper<TPKTPacket> context = new Plc4xNettyWrapper<>(plc4xS7Protocol, TPKTPacket.class);
+                setProtocol(plc4xS7Protocol);
+                Plc4xNettyWrapper<TPKTPacket> context = new Plc4xNettyWrapper<>(pipeline, plc4xS7Protocol, TPKTPacket.class);
                 pipeline.addLast(context);
             }
         };
@@ -184,21 +185,6 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
     }
 
     @Override
-    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
-        InternalPlcReadRequest internalReadRequest = checkInternal(readRequest, InternalPlcReadRequest.class);
-        CompletableFuture<InternalPlcReadResponse> future = new CompletableFuture<>();
-        PlcRequestContainer<InternalPlcReadRequest, InternalPlcReadResponse> container =
-            new PlcRequestContainer<>(internalReadRequest, future);
-        channel.writeAndFlush(container).addListener(f -> {
-            if (!f.isSuccess()) {
-                future.completeExceptionally(f.cause());
-            }
-        });
-        return future
-            .thenApply(PlcReadResponse.class::cast);
-    }
-
-    @Override
     public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         InternalPlcWriteRequest internalWriteRequest = checkInternal(writeRequest, InternalPlcWriteRequest.class);
         CompletableFuture<InternalPlcWriteResponse> future = new CompletableFuture<>();
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
index 9c0e3fd..0105d65 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
@@ -20,24 +20,74 @@ package org.apache.plc4x.java.s7.readwrite.protocol;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.messages.PlcResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.java.s7.readwrite.*;
-import org.apache.plc4x.java.s7.readwrite.types.*;
+import org.apache.plc4x.java.s7.readwrite.COTPPacket;
+import org.apache.plc4x.java.s7.readwrite.COTPPacketConnectionRequest;
+import org.apache.plc4x.java.s7.readwrite.COTPPacketConnectionResponse;
+import org.apache.plc4x.java.s7.readwrite.COTPPacketData;
+import org.apache.plc4x.java.s7.readwrite.COTPParameter;
+import org.apache.plc4x.java.s7.readwrite.COTPParameterCalledTsap;
+import org.apache.plc4x.java.s7.readwrite.COTPParameterCallingTsap;
+import org.apache.plc4x.java.s7.readwrite.COTPParameterTpduSize;
+import org.apache.plc4x.java.s7.readwrite.S7Address;
+import org.apache.plc4x.java.s7.readwrite.S7AddressAny;
+import org.apache.plc4x.java.s7.readwrite.S7Message;
+import org.apache.plc4x.java.s7.readwrite.S7MessageRequest;
+import org.apache.plc4x.java.s7.readwrite.S7MessageResponse;
+import org.apache.plc4x.java.s7.readwrite.S7MessageUserData;
+import org.apache.plc4x.java.s7.readwrite.S7ParameterReadVarRequest;
+import org.apache.plc4x.java.s7.readwrite.S7ParameterReadVarResponse;
+import org.apache.plc4x.java.s7.readwrite.S7ParameterSetupCommunication;
+import org.apache.plc4x.java.s7.readwrite.S7ParameterUserData;
+import org.apache.plc4x.java.s7.readwrite.S7ParameterUserDataItem;
+import org.apache.plc4x.java.s7.readwrite.S7ParameterUserDataItemCPUFunctions;
+import org.apache.plc4x.java.s7.readwrite.S7PayloadReadVarRequest;
+import org.apache.plc4x.java.s7.readwrite.S7PayloadReadVarResponse;
+import org.apache.plc4x.java.s7.readwrite.S7PayloadSetupCommunication;
+import org.apache.plc4x.java.s7.readwrite.S7PayloadUserData;
+import org.apache.plc4x.java.s7.readwrite.S7PayloadUserDataItem;
+import org.apache.plc4x.java.s7.readwrite.S7PayloadUserDataItemCpuFunctionReadSzlRequest;
+import org.apache.plc4x.java.s7.readwrite.S7PayloadUserDataItemCpuFunctionReadSzlResponse;
+import org.apache.plc4x.java.s7.readwrite.S7VarPayloadDataItem;
+import org.apache.plc4x.java.s7.readwrite.S7VarRequestParameterItem;
+import org.apache.plc4x.java.s7.readwrite.S7VarRequestParameterItemAddress;
+import org.apache.plc4x.java.s7.readwrite.SzlDataTreeItem;
+import org.apache.plc4x.java.s7.readwrite.SzlId;
+import org.apache.plc4x.java.s7.readwrite.TPKTPacket;
+import org.apache.plc4x.java.s7.readwrite.types.COTPProtocolClass;
+import org.apache.plc4x.java.s7.readwrite.types.COTPTpduSize;
+import org.apache.plc4x.java.s7.readwrite.types.DataTransportErrorCode;
+import org.apache.plc4x.java.s7.readwrite.types.DataTransportSize;
+import org.apache.plc4x.java.s7.readwrite.types.S7ControllerType;
+import org.apache.plc4x.java.s7.readwrite.types.SzlModuleTypeClass;
+import org.apache.plc4x.java.s7.readwrite.types.SzlSublist;
 import org.apache.plc4x.java.s7.readwrite.utils.S7Field;
 import org.apache.plc4x.java.spi.ConversationContext;
 import org.apache.plc4x.java.spi.Plc4xProtocolBase;
 import org.apache.plc4x.java.spi.messages.DefaultPlcReadRequest;
 import org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse;
 import org.apache.plc4x.java.spi.messages.InternalPlcReadRequest;
-import org.apache.plc4x.java.spi.messages.PlcRequestContainer;
-import org.apache.plc4x.java.spi.messages.items.*;
+import org.apache.plc4x.java.spi.messages.items.BaseDefaultFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultBigIntegerFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultBooleanFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultByteFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultDoubleFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultFloatFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultIntegerFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultLocalDateFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultLocalDateTimeFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultLocalTimeFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultLongFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultShortFieldItem;
+import org.apache.plc4x.java.spi.messages.items.DefaultStringFieldItem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +99,12 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.temporal.ChronoUnit;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -128,6 +183,40 @@ public class Plc4xS7Protocol extends Plc4xProtocolBase<TPKTPacket> {
             });
     }
 
+    @Override public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
+        CompletableFuture<PlcReadResponse> future = new CompletableFuture<>();
+        DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest;
+        List<S7VarRequestParameterItem> requestItems = new ArrayList<>(request.getNumberOfFields());
+        for (PlcField field : request.getFields()) {
+            requestItems.add(new S7VarRequestParameterItemAddress(toS7Address(field)));
+        }
+        final int tpduId = tpduGenerator.getAndIncrement();
+        TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null,
+            new S7MessageRequest(tpduId,
+                new S7ParameterReadVarRequest(requestItems.toArray(new S7VarRequestParameterItem[0])),
+                new S7PayloadReadVarRequest()),
+            true, (short) tpduId));
+
+        context.sendRequest(tpktPacket)
+            .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
+            .onTimeout(future::completeExceptionally)
+            .onError((p, e) -> future.completeExceptionally(e))
+            .check(p -> p.getPayload() instanceof COTPPacketData)
+            .unwrap(p -> ((COTPPacketData) p.getPayload()))
+            .check(p -> p.getPayload() instanceof S7MessageResponse)
+            .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+            .check(p -> p.getTpduReference() == tpduId)
+            .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
+            .handle(p -> {
+                try {
+                    future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
+                } catch (PlcProtocolException e) {
+                    e.printStackTrace();
+                }
+            });
+        return future;
+    }
+
     private void extractControllerTypeAndFireConnected(ConversationContext<TPKTPacket> context, S7PayloadUserData payloadUserData) {
         for (S7PayloadUserDataItem item : payloadUserData.getItems()) {
             if (!(item instanceof S7PayloadUserDataItemCpuFunctionReadSzlResponse)) {
@@ -191,45 +280,7 @@ public class Plc4xS7Protocol extends Plc4xProtocolBase<TPKTPacket> {
             }, null, (short) 0x0000, (short) 0x000F, COTPProtocolClass.CLASS_0);
     }
 
-    @Override
-    protected void encode(ConversationContext<TPKTPacket> context, PlcRequestContainer msg) throws Exception {
-        if (msg.getRequest() instanceof DefaultPlcReadRequest) {
-            DefaultPlcReadRequest request = (DefaultPlcReadRequest) msg.getRequest();
-            List<S7VarRequestParameterItem> requestItems = new ArrayList<>(request.getNumberOfFields());
-            for (PlcField field : request.getFields()) {
-                requestItems.add(new S7VarRequestParameterItemAddress(toS7Address(field)));
-            }
-            final int tpduId = tpduGenerator.getAndIncrement();
-            TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null,
-                new S7MessageRequest(tpduId,
-                    new S7ParameterReadVarRequest(requestItems.toArray(new S7VarRequestParameterItem[0])),
-                    new S7PayloadReadVarRequest()),
-                true, (short) tpduId));
-
-            context.sendRequest(tpktPacket)
-                .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
-                // TODO: make it really optional
-                .check(p -> p.getPayload() instanceof COTPPacketData)
-                .unwrap(p -> ((COTPPacketData) p.getPayload()))
-                .check(p -> p.getPayload() instanceof S7MessageResponse)
-                .unwrap(p -> ((S7MessageResponse) p.getPayload()))
-                .check(p -> p.getTpduReference() == tpduId)
-                .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
-                .handle(p -> {
-                    try {
-                        msg.getResponseFuture().complete(decodeReadResponse(p, msg));
-                    } catch (PlcProtocolException e) {
-                        e.printStackTrace();
-                    }
-                });
-        } else {
-            throw new NotImplementedException("Implement me");
-        }
-    }
-
-    private PlcResponse decodeReadResponse(S7MessageResponse responseMessage, PlcRequestContainer requestContainer) throws PlcProtocolException {
-        InternalPlcReadRequest plcReadRequest = (InternalPlcReadRequest) requestContainer.getRequest();
-
+    private PlcResponse decodeReadResponse(S7MessageResponse responseMessage, InternalPlcReadRequest plcReadRequest) throws PlcProtocolException {
         S7PayloadReadVarResponse payload = (S7PayloadReadVarResponse) responseMessage.getPayload();
 
         // If the numbers of items don't match, we're in big trouble as the only
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7Protocol.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7Protocol.java
index c98aaad..cdc5feb 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7Protocol.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7Protocol.java
@@ -19,9 +19,9 @@ under the License.
 package org.apache.plc4x.java.s7.readwrite.protocol;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.plc4x.java.spi.GeneratedDriverByteToMessageCodec;
-import org.apache.plc4x.java.s7.readwrite.*;
+import org.apache.plc4x.java.s7.readwrite.TPKTPacket;
 import org.apache.plc4x.java.s7.readwrite.io.TPKTPacketIO;
+import org.apache.plc4x.java.spi.GeneratedDriverByteToMessageCodec;
 import org.apache.plc4x.java.utils.MessageIO;
 import org.apache.plc4x.java.utils.ParseException;
 import org.apache.plc4x.java.utils.ReadBuffer;