You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/12/20 15:39:15 UTC

[plc4x] branch next-gen-core updated: - Continued working on the drivers ...

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/next-gen-core by this push:
     new 4d34052  - Continued working on the drivers ...
4d34052 is described below

commit 4d34052b7724992104339d9d1d4bd0e0171edcc7
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Fri Dec 20 16:39:07 2019 +0100

    - Continued working on the drivers ...
---
 .../apache/plc4x/java/spi/ConversationContext.java |   5 +
 .../apache/plc4x/java/spi/Plc4xNettyWrapper.java   |  38 ++-
 .../apache/plc4x/java/spi/Plc4xProtocolBase.java   |   4 +
 .../plc4x/java/spi/parser/ConnectionParser.java    |  20 +-
 .../java/spi/parser/ConnectionParserTest.java      |   4 +-
 .../connection/PassiveKnxNetIpPlcConnection.java   |   4 +-
 ...pProtocol.java => KnxNetIpProtocolMessage.java} |   4 +-
 sandbox/test-java-knxnetip-driver/pom.xml          |  16 +-
 .../apache/plc4x/java/knxnetip/KnxNetIpDriver.java |  31 +--
 .../connection/KnxNetIpConfiguration.java}         |  23 +-
 .../knxnetip/connection/KnxNetIpConnection.java    |  53 ++--
 .../knxnetip/protocol/KnxNetIpPlc4xProtocol.java   |  61 -----
 .../knxnetip/protocol/KnxNetIpProtocolLogic.java   | 302 ++++++++++++++-------
 ...olPackets.java => KnxNetIpProtocolMessage.java} |   4 +-
 .../apache/plc4x/java/knxnetip/ManualKnxNetIp.java |   2 +-
 .../java/knxnetip/ManualKnxNetIpWithEts5.java      |   4 +-
 .../src/test/resources/logback.xml                 |  36 +++
 .../apache/plc4x/java/s7/readwrite/S7Driver.java   |   2 +-
 .../s7/readwrite/connection/S7Configuration.java   |   4 +-
 .../java/s7/readwrite/connection/S7Connection.java |  25 --
 .../s7/readwrite/protocol/S7ProtocolLogic.java     |  30 +-
 .../adapters/source/knxnetip/KnxNetIpAdapter.java  |   4 +-
 22 files changed, 386 insertions(+), 290 deletions(-)

diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java
index e96dc6d..b6c6886 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java
@@ -19,6 +19,7 @@
 
 package org.apache.plc4x.java.spi;
 
+import io.netty.channel.Channel;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 
 import java.time.Duration;
@@ -30,10 +31,14 @@ import java.util.function.Predicate;
 
 public interface ConversationContext<T> {
 
+    Channel getChannel();
+
     void sendToWire(T msg);
 
     void fireConnected();
 
+    void fireDisconnected();
+
     SendRequestContext<T> sendRequest(T packet);
 
     interface SendRequestContext<T> {
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index 37f2145..3d0dd27 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -19,15 +19,17 @@
 
 package org.apache.plc4x.java.spi;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.MessageToMessageCodec;
 import io.vavr.control.Either;
 import org.apache.plc4x.java.spi.events.ConnectEvent;
 import org.apache.plc4x.java.spi.events.ConnectedEvent;
+import org.apache.plc4x.java.spi.events.DisconnectEvent;
+import org.apache.plc4x.java.spi.events.DisconnectedEvent;
 import org.apache.plc4x.java.spi.internal.DefaultSendRequestContext;
 import org.apache.plc4x.java.spi.internal.HandlerRegistration;
-import org.apache.plc4x.java.spi.messages.PlcRequestContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,15 +57,28 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
         this.registeredHandlers = new ConcurrentLinkedQueue<>();
         this.protocolBase = protocol;
         this.protocolBase.setContext(new ConversationContext<T>() {
-            @Override public void sendToWire(T msg) {
+            @Override
+            public Channel getChannel() {
+                return pipeline.channel();
+            }
+
+            @Override
+            public void sendToWire(T msg) {
                 pipeline.writeAndFlush(msg);
             }
 
-            @Override public void fireConnected() {
+            @Override
+            public void fireConnected() {
                 pipeline.fireUserEventTriggered(ConnectedEvent.class);
             }
 
-            @Override public SendRequestContext<T> sendRequest(T packet) {
+            @Override
+            public void fireDisconnected() {
+                pipeline.fireUserEventTriggered(DisconnectedEvent.class);
+            }
+
+            @Override
+            public SendRequestContext<T> sendRequest(T packet) {
                 return new DefaultSendRequestContext<T>(handler -> {
                     logger.trace("Adding Response Handler...");
                     registeredHandlers.add(handler);
@@ -83,7 +98,7 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
 //            }
 //        }, plcRequestContainer);
         // NOOP
-        logger.info("Forwarding request to plc {}", msg);
+        logger.debug("Forwarding request to plc {}", msg);
         list.add(msg);
     }
 
@@ -138,6 +153,8 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
         // by sending a connection request to the plc.
         if (evt instanceof ConnectEvent) {
             this.protocolBase.onConnect(new DefaultConversationContext<>(ctx));
+        } else if (evt instanceof DisconnectEvent) {
+            this.protocolBase.onDisconnect(new DefaultConversationContext<>(ctx));
         } else {
             super.userEventTriggered(ctx, evt);
         }
@@ -151,6 +168,11 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
         }
 
         @Override
+        public Channel getChannel() {
+            return channelHandlerContext.channel();
+        }
+
+        @Override
         public void sendToWire(T1 msg) {
             logger.trace("Sending to wire {}", msg);
             channelHandlerContext.channel().writeAndFlush(msg);
@@ -163,6 +185,12 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
         }
 
         @Override
+        public void fireDisconnected() {
+            logger.trace("Firing Disconnected!");
+            channelHandlerContext.pipeline().fireUserEventTriggered(new DisconnectedEvent());
+        }
+
+        @Override
         public SendRequestContext<T1> sendRequest(T1 packet) {
             return new DefaultSendRequestContext<T1>(handler -> {
                 logger.trace("Adding Response Handler...");
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
index 24f51f6..42131d7 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xProtocolBase.java
@@ -44,6 +44,10 @@ public abstract class Plc4xProtocolBase<T> {
         // Intentionally do nothing here
     }
 
+    public void onDisconnect(ConversationContext<T> context) {
+        // Intentionally do nothing here
+    }
+
     /**
      * TODO document me
      * <p>
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/parser/ConnectionParser.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/parser/ConnectionParser.java
index 50d712f..244a1f8 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/parser/ConnectionParser.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/parser/ConnectionParser.java
@@ -56,22 +56,32 @@ public class ConnectionParser {
             URI url = new URI(string);
             Map<String, List<String>> stringListMap = splitQuery(url);
 
-            // TODO notify on umatched parameters
+            // TODO notify on unmatched parameters
 
             Iterator<Map.Entry<String, Field>> iterator = fieldMap.entrySet().iterator();
             for (Iterator<Map.Entry<String, Field>> iter = iterator; iter.hasNext(); ) {
                 Map.Entry<String, Field> entry = iter.next();
                 // TODO field name also from annotation
                 if (stringListMap.containsKey(entry.getKey())) {
-                    fieldMap.get(entry.getKey()).setAccessible(true);
-                    fieldMap.get(entry.getKey()).setInt(instance, Integer.parseInt(stringListMap.get(entry.getKey()).get(0)));
+                    final Field field = fieldMap.get(entry.getKey());
+                    field.setAccessible(true);
+                    if (field.getType().isAssignableFrom(String.class)) {
+                        field.set(instance, stringListMap.get(entry.getKey()).get(0));
+                    } else if (field.getType().isAssignableFrom(int.class)) {
+                        field.setInt(instance, Integer.parseInt(stringListMap.get(entry.getKey()).get(0)));
+                    }
                     iter.remove();
                 } else {
                     // TODO Implement other types
                     IntDefaultValue intDefaultValue = fieldMap.get(entry.getKey()).getAnnotation(IntDefaultValue.class);
                     if (intDefaultValue != null) {
-                        fieldMap.get(entry.getKey()).setAccessible(true);
-                        fieldMap.get(entry.getKey()).setInt(instance, intDefaultValue.value());
+                        final Field field = fieldMap.get(entry.getKey());
+                        field.setAccessible(true);
+                        if (field.getType().isAssignableFrom(String.class)) {
+                            //field.set(instance, stringListMap.get(entry.getKey()).get(0));
+                        } else if (field.getType().isAssignableFrom(int.class)) {
+                            field.setInt(instance, intDefaultValue.value());
+                        }
                         iter.remove();
                     }
                 }
diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/parser/ConnectionParserTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/parser/ConnectionParserTest.java
index 36100fc..22a31b8 100644
--- a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/parser/ConnectionParserTest.java
+++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/parser/ConnectionParserTest.java
@@ -27,8 +27,8 @@ class ConnectionParserTest {
 
     @Test
     void parse() {
-        ConnectionParser parster = new ConnectionParser();
-        PropertiesDescriptor properties = parster.parse("s7://192.168.167.1?rackId=1", PropertiesDescriptor.class);
+        ConnectionParser parser = new ConnectionParser();
+        PropertiesDescriptor properties = parser.parse("s7://192.168.167.1?rackId=1", PropertiesDescriptor.class);
 
         assertEquals(1, properties.rackId);
         assertEquals(1, properties.slotId);
diff --git a/sandbox/test-java-knxnetip-driver-passive/src/main/java/org/apache/plc4x/java/passive/knxnetip/connection/PassiveKnxNetIpPlcConnection.java b/sandbox/test-java-knxnetip-driver-passive/src/main/java/org/apache/plc4x/java/passive/knxnetip/connection/PassiveKnxNetIpPlcConnection.java
index b94ff6a..c710d94 100644
--- a/sandbox/test-java-knxnetip-driver-passive/src/main/java/org/apache/plc4x/java/passive/knxnetip/connection/PassiveKnxNetIpPlcConnection.java
+++ b/sandbox/test-java-knxnetip-driver-passive/src/main/java/org/apache/plc4x/java/passive/knxnetip/connection/PassiveKnxNetIpPlcConnection.java
@@ -28,7 +28,7 @@ import org.apache.plc4x.java.spi.connection.ChannelFactory;
 import org.apache.plc4x.java.spi.connection.NettyPlcConnection;
 import org.apache.plc4x.java.spi.events.ConnectedEvent;
 import org.apache.plc4x.java.passive.knxnetip.model.KnxNetIpField;
-import org.apache.plc4x.java.passive.knxnetip.protocol.KnxNetIpProtocol;
+import org.apache.plc4x.java.passive.knxnetip.protocol.KnxNetIpProtocolMessage;
 import org.apache.plc4x.java.spi.messages.*;
 import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketAddress;
 import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketIpAddress;
@@ -82,7 +82,7 @@ public class PassiveKnxNetIpPlcConnection extends NettyPlcConnection implements
                         }
                     }
                 });
-                pipeline.addLast(new KnxNetIpProtocol());
+                pipeline.addLast(new KnxNetIpProtocolMessage());
                 pipeline.addLast(handler);
             }
         };
diff --git a/sandbox/test-java-knxnetip-driver-passive/src/main/java/org/apache/plc4x/java/passive/knxnetip/protocol/KnxNetIpProtocol.java b/sandbox/test-java-knxnetip-driver-passive/src/main/java/org/apache/plc4x/java/passive/knxnetip/protocol/KnxNetIpProtocolMessage.java
similarity index 93%
rename from sandbox/test-java-knxnetip-driver-passive/src/main/java/org/apache/plc4x/java/passive/knxnetip/protocol/KnxNetIpProtocol.java
rename to sandbox/test-java-knxnetip-driver-passive/src/main/java/org/apache/plc4x/java/passive/knxnetip/protocol/KnxNetIpProtocolMessage.java
index 915b3fb..d65e1c9 100644
--- a/sandbox/test-java-knxnetip-driver-passive/src/main/java/org/apache/plc4x/java/passive/knxnetip/protocol/KnxNetIpProtocol.java
+++ b/sandbox/test-java-knxnetip-driver-passive/src/main/java/org/apache/plc4x/java/passive/knxnetip/protocol/KnxNetIpProtocolMessage.java
@@ -27,9 +27,9 @@ import org.apache.plc4x.java.utils.ParseException;
 import org.apache.plc4x.java.utils.ReadBuffer;
 import org.apache.plc4x.java.utils.WriteBuffer;
 
-public class KnxNetIpProtocol extends GeneratedDriverByteToMessageCodec<KNXNetIPMessage> {
+public class KnxNetIpProtocolMessage extends GeneratedDriverByteToMessageCodec<KNXNetIPMessage> {
 
-    public KnxNetIpProtocol() {
+    public KnxNetIpProtocolMessage() {
         super(new MessageIO<KNXNetIPMessage, KNXNetIPMessage>() {
             @Override
             public KNXNetIPMessage parse(ReadBuffer io) throws ParseException {
diff --git a/sandbox/test-java-knxnetip-driver/pom.xml b/sandbox/test-java-knxnetip-driver/pom.xml
index b0eb8f1..e11d274 100644
--- a/sandbox/test-java-knxnetip-driver/pom.xml
+++ b/sandbox/test-java-knxnetip-driver/pom.xml
@@ -66,6 +66,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.plc4x.sandbox</groupId>
+      <artifactId>test-java-knxnetip-shared</artifactId>
+      <version>0.6.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.plc4x</groupId>
       <artifactId>plc4j-transport-udp</artifactId>
       <version>0.6.0-SNAPSHOT</version>
@@ -80,6 +86,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
     </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.plc4x</groupId>
@@ -95,12 +105,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.plc4x.sandbox</groupId>
-      <artifactId>test-java-knxnetip-shared</artifactId>
-      <version>0.6.0-SNAPSHOT</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>ch.qos.logback</groupId>
       <artifactId>logback-classic</artifactId>
       <scope>test</scope>
diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/KnxNetIpDriver.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/KnxNetIpDriver.java
index 476d8d7..2678a8a 100644
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/KnxNetIpDriver.java
+++ b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/KnxNetIpDriver.java
@@ -22,18 +22,12 @@ import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.knxnetip.connection.KnxNetIpConnection;
-import org.apache.plc4x.java.knxnetip.protocol.KnxNetIpPlc4xProtocol;
 import org.apache.plc4x.java.api.PlcDriver;
 
-import java.net.InetAddress;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.net.*;
 
 public class KnxNetIpDriver implements PlcDriver {
 
-    private static final Pattern PASSIVE_KNXNET_IP_URI_PATTERN =
-        Pattern.compile("^knxnet-ip://(?<host>.*)(?<params>\\?.*)?");
-
     @Override
     public String getProtocolCode() {
         return "knxnet-ip";
@@ -45,19 +39,22 @@ public class KnxNetIpDriver implements PlcDriver {
     }
 
     @Override
-    public PlcConnection connect(String url) throws PlcConnectionException {
-        Matcher matcher = PASSIVE_KNXNET_IP_URI_PATTERN.matcher(url);
-        if (!matcher.matches()) {
-            throw new PlcConnectionException(
-                "Connection url doesn't match the format 'knxnet-ip://{host|ip}'");
+    public PlcConnection connect(String connectionString) throws PlcConnectionException {
+        URL url;
+        try {
+            url = new URL(null, connectionString, new URLStreamHandler() {
+                @Override
+                protected URLConnection openConnection(URL u) {
+                    return null;
+                }
+            });
+        } catch (MalformedURLException e) {
+            throw new PlcConnectionException("Error parsing connection string " + connectionString, e);
         }
-        String host = matcher.group("host");
-
-        String params = matcher.group("params") != null ? matcher.group("params").substring(1) : null;
 
         try {
-            InetAddress serverInetAddress = InetAddress.getByName(host);
-            PlcConnection connection = new KnxNetIpConnection(serverInetAddress, params, new KnxNetIpPlc4xProtocol());
+            InetAddress serverInetAddress = InetAddress.getByName(url.getHost());
+            PlcConnection connection = new KnxNetIpConnection(serverInetAddress, url.getQuery());
             Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                 try {
                     connection.close();
diff --git a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/connection/KnxNetIpConfiguration.java
similarity index 60%
copy from sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
copy to sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/connection/KnxNetIpConfiguration.java
index 0732417..eb53264 100644
--- a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
+++ b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/connection/KnxNetIpConfiguration.java
@@ -16,19 +16,24 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.knxnetip;
+package org.apache.plc4x.java.knxnetip.connection;
 
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.spi.parser.ConfigurationParameter;
 
-import java.util.concurrent.TimeUnit;
+public class KnxNetIpConfiguration {
 
-public class ManualKnxNetIp {
+    @ConfigurationParameter("knxproj-file-path")
+    public String knxprojFilePath;
 
-    public static void main(String[] args) throws Exception {
-        final PlcConnection connection = new PlcDriverManager().getConnection("knxnet-ip://192.168.42.11");
-        TimeUnit.SECONDS.sleep(300);
-        connection.close();
+    @ConfigurationParameter
+    public int groupAddressType = 3;
+
+    @Override
+    public String toString() {
+        return "Configuration{" +
+            "knxprojFilePath=" + knxprojFilePath + ", " +
+            "groupAddressType=" + groupAddressType +
+            '}';
     }
 
 }
diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/connection/KnxNetIpConnection.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/connection/KnxNetIpConnection.java
index 827fd46..cd0b941 100644
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/connection/KnxNetIpConnection.java
+++ b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/connection/KnxNetIpConnection.java
@@ -21,10 +21,12 @@ package org.apache.plc4x.java.knxnetip.connection;
 import io.netty.channel.*;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.base.connection.UdpSocketChannelFactory;
 import org.apache.plc4x.java.base.connection.protocol.DatagramUnpackingHandler;
+import org.apache.plc4x.java.knxnetip.readwrite.KNXNetIPMessage;
+import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
+import org.apache.plc4x.java.spi.Plc4xProtocolBase;
 import org.apache.plc4x.java.spi.connection.ChannelFactory;
 import org.apache.plc4x.java.spi.connection.NettyPlcConnection;
 import org.apache.plc4x.java.spi.events.ConnectEvent;
@@ -32,8 +34,9 @@ import org.apache.plc4x.java.spi.events.ConnectedEvent;
 import org.apache.plc4x.java.spi.events.DisconnectEvent;
 import org.apache.plc4x.java.knxnetip.model.KnxNetIpField;
 import org.apache.plc4x.java.knxnetip.protocol.KnxNetIpProtocolLogic;
-import org.apache.plc4x.java.knxnetip.protocol.KnxNetIpProtocolPackets;
+import org.apache.plc4x.java.knxnetip.protocol.KnxNetIpProtocolMessage;
 import org.apache.plc4x.java.spi.messages.*;
+import org.apache.plc4x.java.spi.parser.ConnectionParser;
 
 import java.net.InetAddress;
 import java.util.concurrent.CompletableFuture;
@@ -45,21 +48,15 @@ public class KnxNetIpConnection extends NettyPlcConnection implements PlcReader
 
     public static final int KNXNET_IP_PORT = 3671;
 
-    private final ChannelHandler handler;
+    private final KnxNetIpConfiguration configuration;
 
-    public KnxNetIpConnection(InetAddress address, String params, ChannelHandler handler) {
-        this(new UdpSocketChannelFactory(address, KNXNET_IP_PORT), params, handler);
+    public KnxNetIpConnection(InetAddress address, String params) {
+        this(new UdpSocketChannelFactory(address, KNXNET_IP_PORT), params);
     }
 
-    public KnxNetIpConnection(ChannelFactory channelFactory, String params, ChannelHandler handler) {
+    public KnxNetIpConnection(ChannelFactory channelFactory, String params) {
         super(channelFactory, true);
-        this.handler = handler;
-    }
-
-    @Override
-    protected void sendChannelCreatedEvent() {
-        // Send an event to the pipeline telling the Protocol filters what's going on.
-        channel.pipeline().fireUserEventTriggered(new ConnectEvent());
+        configuration = ConnectionParser.parse("a://1.1.1.1?" + params, KnxNetIpConfiguration.class);
     }
 
     @Override
@@ -86,9 +83,13 @@ public class KnxNetIpConnection extends NettyPlcConnection implements PlcReader
                 });
                 // Unpack the ByteBuf included in the DatagramPackage.
                 pipeline.addLast(new DatagramUnpackingHandler());
-                pipeline.addLast(new KnxNetIpProtocolPackets());
-                pipeline.addLast(new KnxNetIpProtocolLogic());
-                pipeline.addLast(handler);
+                pipeline.addLast(new KnxNetIpProtocolMessage());
+
+                Plc4xProtocolBase<KNXNetIPMessage> knxNetIpProtocolLogic = new KnxNetIpProtocolLogic(configuration);
+                setProtocol(knxNetIpProtocolLogic);
+                Plc4xNettyWrapper<KNXNetIPMessage> context =
+                    new Plc4xNettyWrapper<>(pipeline, knxNetIpProtocolLogic, KNXNetIPMessage.class);
+                pipeline.addLast(context);
             }
         };
     }
@@ -105,6 +106,11 @@ public class KnxNetIpConnection extends NettyPlcConnection implements PlcReader
 
     @Override
     public void close() throws PlcConnectionException {
+        if(channel == null) {
+            super.close();
+            return;
+        }
+
         CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
         channel.pipeline().fireUserEventTriggered(new DisconnectEvent(disconnectFuture));
         try {
@@ -120,18 +126,9 @@ public class KnxNetIpConnection extends NettyPlcConnection implements PlcReader
     }
 
     @Override
-    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
-        InternalPlcReadRequest internalReadRequest = checkInternal(readRequest, InternalPlcReadRequest.class);
-        CompletableFuture<InternalPlcReadResponse> future = new CompletableFuture<>();
-        PlcRequestContainer<InternalPlcReadRequest, InternalPlcReadResponse> container =
-            new PlcRequestContainer<>(internalReadRequest, future);
-        channel.writeAndFlush(container).addListener(f -> {
-            if (!f.isSuccess()) {
-                future.completeExceptionally(f.cause());
-            }
-        });
-        return future
-            .thenApply(PlcReadResponse.class::cast);
+    protected void sendChannelCreatedEvent() {
+        // Send an event to the pipeline telling the Protocol filters what's going on.
+        channel.pipeline().fireUserEventTriggered(new ConnectEvent());
     }
 
 }
diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpPlc4xProtocol.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpPlc4xProtocol.java
deleted file mode 100644
index d90534a..0000000
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpPlc4xProtocol.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.java.knxnetip.protocol;
-
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.plc4x.java.spi.PlcMessageToMessageCodec;
-import org.apache.plc4x.java.spi.messages.PlcRequestContainer;
-import org.apache.plc4x.java.knxnetip.readwrite.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class KnxNetIpPlc4xProtocol extends PlcMessageToMessageCodec<KNXNetIPMessage, PlcRequestContainer> {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(KnxNetIpPlc4xProtocol.class);
-
-    @Override
-    protected void encode(ChannelHandlerContext ctx, PlcRequestContainer msg, List<Object> out) {
-
-    }
-
-    @Override
-    protected void decode(ChannelHandlerContext ctx, KNXNetIPMessage msg, List<Object> out) {
-        if(msg instanceof TunnelingRequest) {
-            TunnelingRequest tunnelingRequest = (TunnelingRequest) msg;
-            CEMIBusmonInd busmonInd = (CEMIBusmonInd) tunnelingRequest.getCemi();
-            if (busmonInd.getCemiFrame() instanceof CEMIFrameData) {
-                outputStringRepresentation((CEMIFrameData) busmonInd.getCemiFrame());
-            }
-        }
-    }
-
-    private void outputStringRepresentation(CEMIFrameData data) {
-        final KNXAddress sourceAddress = data.getSourceAddress();
-        final byte[] destinationAddress = data.getDestinationAddress();
-        final boolean groupAddress = data.getGroupAddress();
-        final byte[] payload = new byte[data.getData().length + 1];
-        payload[0] = data.getDataFirstByte();
-        System.arraycopy(data.getData(), 0, payload, 1, data.getData().length);
-        String payloadString = Hex.encodeHexString(payload);
-    }
-
-}
diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
index 88c33b8..fe0dee9 100644
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
+++ b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
@@ -18,26 +18,32 @@ under the License.
 */
 package org.apache.plc4x.java.knxnetip.protocol;
 
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.socket.DatagramChannel;
-import org.apache.plc4x.java.spi.PlcMessageToMessageCodec;
-import org.apache.plc4x.java.spi.events.ConnectEvent;
-import org.apache.plc4x.java.spi.events.ConnectedEvent;
-import org.apache.plc4x.java.spi.events.DisconnectEvent;
-import org.apache.plc4x.java.spi.events.DisconnectedEvent;
-import org.apache.plc4x.java.knxnetip.events.KnxGatewayFoundEvent;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.plc4x.java.ets5.passive.*;
+import org.apache.plc4x.java.ets5.passive.io.KNXGroupAddressIO;
+import org.apache.plc4x.java.ets5.passive.io.KnxDatapointIO;
+import org.apache.plc4x.java.knxnetip.connection.KnxNetIpConfiguration;
+import org.apache.plc4x.java.knxnetip.ets5.Ets5Parser;
+import org.apache.plc4x.java.knxnetip.ets5.model.Ets5Model;
+import org.apache.plc4x.java.knxnetip.ets5.model.GroupAddress;
+import org.apache.plc4x.java.spi.ConversationContext;
+import org.apache.plc4x.java.spi.Plc4xProtocolBase;
 import org.apache.plc4x.java.knxnetip.readwrite.*;
 import org.apache.plc4x.java.knxnetip.readwrite.types.HostProtocolCode;
 import org.apache.plc4x.java.knxnetip.readwrite.types.KnxLayer;
 import org.apache.plc4x.java.knxnetip.readwrite.types.Status;
+import org.apache.plc4x.java.utils.ReadBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.net.InetSocketAddress;
+import java.time.Duration;
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
 
-public class KnxNetIpProtocolLogic extends PlcMessageToMessageCodec<KNXNetIPMessage, KNXNetIPMessage> {
+public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(KnxNetIpProtocolLogic.class);
 
@@ -48,126 +54,214 @@ public class KnxNetIpProtocolLogic extends PlcMessageToMessageCodec<KNXNetIPMess
     private short communicationChannelId;
 
     private Timer connectionStateTimer;
-    private CompletableFuture<Void> disconnectFuture;
 
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
-        if (evt instanceof ConnectEvent) {
-            DatagramChannel channel = (DatagramChannel) ctx.pipeline().channel();
-            final InetSocketAddress localSocketAddress = channel.localAddress();
-            localIPAddress = new IPAddress(localSocketAddress.getAddress().getAddress());
-            localPort = localSocketAddress.getPort();
-            SearchRequest searchRequest = new SearchRequest(
-                new HPAIDiscoveryEndpoint(HostProtocolCode.IPV4_UDP, localIPAddress, localPort));
-            ctx.channel().writeAndFlush(searchRequest);
-        } else if(evt instanceof KnxGatewayFoundEvent) {
-            DatagramChannel channel = (DatagramChannel) ctx.pipeline().channel();
-            final InetSocketAddress localSocketAddress = channel.localAddress();
-            localIPAddress = new IPAddress(localSocketAddress.getAddress().getAddress());
-            localPort = localSocketAddress.getPort();
-            ConnectionRequest connectionRequest = new ConnectionRequest(
-                new HPAIDiscoveryEndpoint(HostProtocolCode.IPV4_UDP, localIPAddress, localPort),
-                new HPAIDataEndpoint(HostProtocolCode.IPV4_UDP, localIPAddress, localPort),
-                new ConnectionRequestInformationTunnelConnection(KnxLayer.TUNNEL_BUSMONITOR));
-            ctx.channel().writeAndFlush(connectionRequest);
-        } else if(evt instanceof DisconnectEvent) {
-            DisconnectEvent disconnectEvent = (DisconnectEvent) evt;
-            disconnectFuture = disconnectEvent.getFuture();
-            DisconnectRequest disconnectRequest = new DisconnectRequest(communicationChannelId,
-                new HPAIControlEndpoint(HostProtocolCode.IPV4_UDP, localIPAddress, localPort));
-            ctx.channel().writeAndFlush(disconnectRequest);
+    private byte groupAddressType;
+    private Ets5Model ets5Model;
+
+    public KnxNetIpProtocolLogic(KnxNetIpConfiguration configuration) {
+        if(configuration.knxprojFilePath != null) {
+            File knxprojFile = new File(configuration.knxprojFilePath);
+            if(knxprojFile.exists() && knxprojFile.isFile()) {
+                ets5Model = new Ets5Parser().parse(knxprojFile);
+                groupAddressType = ets5Model.getGroupAddressType();
+            } else {
+                throw new RuntimeException(String.format(
+                    "File specified with 'knxproj-file-path' does not exist or is not a file: '%s'",
+                    configuration.knxprojFilePath));
+            }
         }
     }
 
     @Override
-    protected void encode(ChannelHandlerContext ctx, KNXNetIPMessage msg, List<Object> out) {
-        out.add(msg);
+    public void onConnect(ConversationContext<KNXNetIPMessage> context) {
+        DatagramChannel channel = (DatagramChannel) context.getChannel();
+        final InetSocketAddress localSocketAddress = channel.localAddress();
+        localIPAddress = new IPAddress(localSocketAddress.getAddress().getAddress());
+        localPort = localSocketAddress.getPort();
+
+        // First send out a search request
+        // REMARK: This might be optional ... usually we would send a search request to ip 224.0.23.12
+        // Any KNX Gateway will respond with a search response. We're currently directly sending to the
+        // known gateway address, so it's sort of pointless, but at least only one device will respond.
+        SearchRequest searchRequest = new SearchRequest(
+            new HPAIDiscoveryEndpoint(HostProtocolCode.IPV4_UDP, localIPAddress, localPort));
+        context.sendRequest(searchRequest)
+            .expectResponse(KNXNetIPMessage.class, Duration.ofMillis(1000))
+            .check(p -> p instanceof SearchResponse)
+            .unwrap(p -> (SearchResponse) p)
+            .handle(searchResponse -> {
+                // Check if this device supports tunneling services.
+                final ServiceId tunnelingService = Arrays.stream(searchResponse.getDibSuppSvcFamilies().getServiceIds()).filter(serviceId -> serviceId instanceof KnxNetIpTunneling).findFirst().orElse(null);
+
+                // If this device supports this type of service, tell the driver, we found a suitable device.
+                if(tunnelingService != null) {
+                    // Extract the required information form the search request.
+                    gatewayAddress = searchResponse.getDibDeviceInfo().getKnxAddress();
+                    gatewayName = new String(searchResponse.getDibDeviceInfo().getDeviceFriendlyName()).trim();
+
+                    LOGGER.info(String.format("Found KNX Gateway '%s' with KNX address '%d.%d.%d'", gatewayName,
+                        gatewayAddress.getMainGroup(), gatewayAddress.getMiddleGroup(), gatewayAddress.getSubGroup()));
+
+                    // Next send a connection request to the gateway.
+                    ConnectionRequest connectionRequest = new ConnectionRequest(
+                        new HPAIDiscoveryEndpoint(HostProtocolCode.IPV4_UDP, localIPAddress, localPort),
+                        new HPAIDataEndpoint(HostProtocolCode.IPV4_UDP, localIPAddress, localPort),
+                        new ConnectionRequestInformationTunnelConnection(KnxLayer.TUNNEL_BUSMONITOR));
+                    context.sendRequest(connectionRequest)
+                        .expectResponse(KNXNetIPMessage.class, Duration.ofMillis(1000))
+                        .check(p -> p instanceof ConnectionResponse)
+                        .unwrap(p -> (ConnectionResponse) p)
+                        .handle(connectionResponse -> {
+                            // Remember the communication channel id.
+                            communicationChannelId = connectionResponse.getCommunicationChannelId();
+
+                            // Check if everything went well.
+                            Status status = connectionResponse.getStatus();
+                            if (status == Status.NO_ERROR) {
+                                LOGGER.info(String.format("Connected to KNX Gateway '%s' with KNX address '%d.%d.%d'", gatewayName,
+                                    gatewayAddress.getMainGroup(), gatewayAddress.getMiddleGroup(), gatewayAddress.getSubGroup()));
+
+                                // Send an event that connection setup is complete.
+                                context.fireConnected();
+
+                                // Start a timer to check the connection state every 60 seconds.
+                                // This keeps the connection open if no data is transported.
+                                // Otherwise the gateway will terminate the connection.
+                                connectionStateTimer = new Timer();
+                                connectionStateTimer.scheduleAtFixedRate(new TimerTask() {
+                                    @Override
+                                    public void run() {
+                                        ConnectionStateRequest connectionStateRequest =
+                                            new ConnectionStateRequest(communicationChannelId,
+                                                new HPAIControlEndpoint(HostProtocolCode.IPV4_UDP, localIPAddress, localPort));
+                                        context.sendRequest(connectionStateRequest)
+                                            .expectResponse(KNXNetIPMessage.class, Duration.ofMillis(1000))
+                                            .check(p -> p instanceof ConnectionStateResponse)
+                                            .unwrap(p -> (ConnectionStateResponse) p)
+                                            .handle(connectionStateResponse -> {
+                                                if(connectionStateResponse.getStatus() != Status.NO_ERROR) {
+                                                    if(connectionStateResponse.getStatus() != null) {
+                                                        LOGGER.error(String.format("Connection state problems. Got %s",
+                                                            connectionStateResponse.getStatus().name()));
+                                                    } else {
+                                                        LOGGER.error("Connection state problems. Got no status information.");
+                                                    }
+                                                }
+                                            });
+                                    }
+                                }, 60000, 60000);
+                            } else {
+                                // The connection request wasn't successful.
+                            }
+                        });
+                } else {
+                    // This device doesn't support tunneling ... do some error handling.
+                }
+            });
     }
 
     @Override
-    protected void decode(ChannelHandlerContext ctx, KNXNetIPMessage msg, List<Object> out) {
-        // Handle search responses to find the device able to provide tunneling services
-        if(msg instanceof SearchResponse) {
-            SearchResponse searchResponse = (SearchResponse) msg;
-            // Check if this device supports tunneling services.
-            final ServiceId tunnelingService = Arrays.stream(searchResponse.getDibSuppSvcFamilies().getServiceIds()).filter(serviceId -> serviceId instanceof KnxNetIpTunneling).findFirst().orElse(null);
-            // If this device supports this type of service, tell the driver, we found a suitable device.
-            if(tunnelingService != null) {
-                gatewayAddress = searchResponse.getDibDeviceInfo().getKnxAddress();
-                gatewayName = new String(searchResponse.getDibDeviceInfo().getDeviceFriendlyName()).trim();
-                LOGGER.info(String.format("Found KNX Gateway '%s' with KNX address '%d.%d.%d'", gatewayName,
-                    gatewayAddress.getMainGroup(), gatewayAddress.getMiddleGroup(), gatewayAddress.getSubGroup()));
-                ctx.channel().pipeline().fireUserEventTriggered(new KnxGatewayFoundEvent());
-            }
-        }
+    public void onDisconnect(ConversationContext<KNXNetIPMessage> context) {
+        // Cancel the timer for sending connection state requests.
+        connectionStateTimer.cancel();
 
-        // Handle the response to a connection request.
-        else if(msg instanceof ConnectionResponse) {
-            ConnectionResponse connectionResponse = (ConnectionResponse) msg;
-            Status status = connectionResponse.getStatus();
-            // Remember the communication channel id.
-            communicationChannelId = connectionResponse.getCommunicationChannelId();
-            if (status == Status.NO_ERROR) {
-                LOGGER.info(String.format("Connected to KNX Gateway '%s' with KNX address '%d.%d.%d'", gatewayName,
+        DisconnectRequest disconnectRequest = new DisconnectRequest(communicationChannelId,
+            new HPAIControlEndpoint(HostProtocolCode.IPV4_UDP, localIPAddress, localPort));
+        context.sendRequest(disconnectRequest)
+            .expectResponse(KNXNetIPMessage.class, Duration.ofMillis(1000))
+            .check(p -> p instanceof DisconnectResponse)
+            .unwrap(p -> (DisconnectResponse) p)
+            .handle(disconnectResponse -> {
+                // In general we should probably check if the disconnect was successful, but in
+                // the end we couldn't do much if the disconnect would fail.
+                LOGGER.info(String.format("Disconnected from KNX Gateway '%s' with KNX address '%d.%d.%d'", gatewayName,
                     gatewayAddress.getMainGroup(), gatewayAddress.getMiddleGroup(), gatewayAddress.getSubGroup()));
-                ctx.channel().pipeline().fireUserEventTriggered(new ConnectedEvent());
-                // Start a timer to check the connection state every 60 seconds.
-                connectionStateTimer = new Timer();
-                connectionStateTimer.scheduleAtFixedRate(new TimerTask() {
-                    @Override
-                    public void run() {
-                        ConnectionStateRequest connectionStateRequest =
-                            new ConnectionStateRequest(communicationChannelId,
-                                new HPAIControlEndpoint(HostProtocolCode.IPV4_UDP, localIPAddress, localPort));
-                        ctx.channel().writeAndFlush(connectionStateRequest);
-                    }
-                }, 60000, 60000);
-            } else {
-                LOGGER.error(String.format("Error connecting to KNX Gateway '%s' with KNX address '%d.%d.%d'", gatewayName,
-                    gatewayAddress.getMainGroup(), gatewayAddress.getMiddleGroup(), gatewayAddress.getSubGroup()));
-            }
-        }
 
-        // Handle the responses to the connection state requests.
-        else if(msg instanceof ConnectionStateResponse) {
-            ConnectionStateResponse connectionStateResponse = (ConnectionStateResponse) msg;
-            if(connectionStateResponse.getStatus() != Status.NO_ERROR) {
-                if(connectionStateResponse.getStatus() != null) {
-                    LOGGER.error(String.format("Connection state problems. Got %s",
-                        connectionStateResponse.getStatus().name()));
-                } else {
-                    LOGGER.error("Connection state problems. Got no status information.");
-                }
-            }
-        }
+                // Send an event that connection disconnect is complete.
+                context.fireDisconnected();
+            });
+    }
 
+    @Override
+    protected void decode(ConversationContext<KNXNetIPMessage> context, KNXNetIPMessage msg) throws Exception {
         // Handle a normal tunneling request, which is delivering KNX data.
-        else if(msg instanceof TunnelingRequest) {
+        if(msg instanceof TunnelingRequest) {
             TunnelingRequest tunnelingRequest = (TunnelingRequest) msg;
             final short curCommunicationChannelId =
                 tunnelingRequest.getTunnelingRequestDataBlock().getCommunicationChannelId();
+
             // Only if the communication channel id match, do anything with the request.
             if(curCommunicationChannelId == communicationChannelId) {
+                CEMIBusmonInd busmonInd = (CEMIBusmonInd) tunnelingRequest.getCemi();
+                if (busmonInd.getCemiFrame() instanceof CEMIFrameData) {
+                    CEMIFrameData cemiDataFrame = (CEMIFrameData) busmonInd.getCemiFrame();
+
+                    // The first byte is actually just 6 bit long, but we'll treat it as a full one.
+                    // So here we create a byte array containing the first and all the following bytes.
+                    byte[] payload = new byte[1 + cemiDataFrame.getData().length];
+                    payload[0] = cemiDataFrame.getDataFirstByte();
+                    System.arraycopy(cemiDataFrame.getData(), 0, payload, 1, cemiDataFrame.getData().length);
+
+                    final KNXAddress sourceAddress = cemiDataFrame.getSourceAddress();
+                    final byte[] destinationGroupAddress = cemiDataFrame.getDestinationAddress();
+
+                    // Decode the group address depending on the project settings.
+                    ReadBuffer addressReadBuffer = new ReadBuffer(destinationGroupAddress);
+                    KNXGroupAddress destinationAddress =
+                        KNXGroupAddressIO.parse(addressReadBuffer, groupAddressType);
+
+                    // If there is an ETS5 model provided, continue decoding the payload.
+                    if(ets5Model != null) {
+                        final GroupAddress groupAddress = ets5Model.getGroupAddresses().get(destinationAddress);
+
+                        ReadBuffer rawDataReader = new ReadBuffer(payload);
+
+                        final KnxDatapoint datapoint = KnxDatapointIO.parse(rawDataReader, groupAddress.getType().getMainType(), groupAddress.getType().getSubType());
+                        final String jsonDatapoint = datapoint.toString(ToStringStyle.JSON_STYLE);
+
+                        LOGGER.info("Message from: '" + KnxNetIpProtocolLogic.toString(sourceAddress) + "'" +
+                            " to: '" + KnxNetIpProtocolLogic.toString(destinationAddress) + "'" +
+                            "\n location: '" + groupAddress.getFunction().getSpaceName() + "'" +
+                            " function: '" + groupAddress.getFunction().getName() + "'" +
+                            " meaning: '" + groupAddress.getName() + "'" +
+                            " type: '" + groupAddress.getType().getName() + "'" +
+                            "\n value: '" + jsonDatapoint + "'"
+                        );
+                    }
+                    // Else just output the raw payload.
+                    else {
+                        LOGGER.info("Raw Message: '" + KnxNetIpProtocolLogic.toString(sourceAddress) + "'" +
+                            " to: '" + KnxNetIpProtocolLogic.toString(destinationAddress) + "'" +
+                            "\n payload: '" + Hex.encodeHexString(payload) + "'"
+                        );
+                    }
+                }
+
+                // Confirm receipt of the request.
                 final short sequenceCounter = tunnelingRequest.getTunnelingRequestDataBlock().getSequenceCounter();
                 TunnelingResponse tunnelingResponse = new TunnelingResponse(
                     new TunnelingResponseDataBlock(communicationChannelId, sequenceCounter, Status.NO_ERROR));
-                ctx.channel().writeAndFlush(tunnelingResponse);
-                out.add(tunnelingRequest);
+                context.sendToWire(tunnelingResponse);
             }
         }
+    }
 
-        // Handle the cleaning up after getting the response to a disconnect request.
-        else if(msg instanceof DisconnectResponse) {
-            // In general we should probably check if the disconnect was successful, but in
-            // the end we couldn't do much if the disconnect would fail.
-            ctx.channel().pipeline().fireUserEventTriggered(new DisconnectedEvent());
-            LOGGER.info(String.format("Disconnected from KNX Gateway '%s' with KNX address '%d.%d.%d'", gatewayName,
-                gatewayAddress.getMainGroup(), gatewayAddress.getMiddleGroup(), gatewayAddress.getSubGroup()));
-            // Notify the closer, that we're done disconnecting.
-            if(disconnectFuture != null) {
-                disconnectFuture.complete(null);
-            }
+    protected static String toString(KNXAddress knxAddress) {
+        return knxAddress.getMainGroup() + "." + knxAddress.getMiddleGroup() + "." + knxAddress.getSubGroup();
+    }
+
+    protected static String toString(KNXGroupAddress groupAddress) {
+        if(groupAddress instanceof KNXGroupAddress3Level) {
+            KNXGroupAddress3Level level3 = (KNXGroupAddress3Level) groupAddress;
+            return level3.getMainGroup() + "/" + level3.getMiddleGroup() + "/" + level3.getSubGroup();
+        } else if(groupAddress instanceof KNXGroupAddress2Level) {
+            KNXGroupAddress2Level level2 = (KNXGroupAddress2Level) groupAddress;
+            return level2.getMainGroup() + "/" + level2.getSubGroup();
+        } else if(groupAddress instanceof KNXGroupAddressFreeLevel) {
+            KNXGroupAddressFreeLevel free = (KNXGroupAddressFreeLevel) groupAddress;
+            return free.getSubGroup() + "";
         }
+        throw new RuntimeException("Unsupported Group Address Type " + groupAddress.getClass().getName());
     }
 
 }
diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolPackets.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolMessage.java
similarity index 95%
rename from sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolPackets.java
rename to sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolMessage.java
index a75296e..8078a3f 100644
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolPackets.java
+++ b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolMessage.java
@@ -27,9 +27,9 @@ import org.apache.plc4x.java.utils.ParseException;
 import org.apache.plc4x.java.utils.ReadBuffer;
 import org.apache.plc4x.java.utils.WriteBuffer;
 
-public class KnxNetIpProtocolPackets extends GeneratedDriverByteToMessageCodec<KNXNetIPMessage> {
+public class KnxNetIpProtocolMessage extends GeneratedDriverByteToMessageCodec<KNXNetIPMessage> {
 
-    public KnxNetIpProtocolPackets() {
+    public KnxNetIpProtocolMessage() {
         super(new MessageIO<KNXNetIPMessage, KNXNetIPMessage>() {
             @Override
             public KNXNetIPMessage parse(ReadBuffer io) throws ParseException {
diff --git a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java b/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
index 0732417..cee09c1 100644
--- a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
+++ b/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
 public class ManualKnxNetIp {
 
     public static void main(String[] args) throws Exception {
-        final PlcConnection connection = new PlcDriverManager().getConnection("knxnet-ip://192.168.42.11");
+        final PlcConnection connection = new PlcDriverManager().getConnection("knxnet-ip://192.168.42.11?knxprojFilePath=/Users/christofer.dutz/Projects/Apache/PLC4X-Documents/KNX/Stettiner%20Str.%2013/StettinerStr-Soll-Ist-Temperatur.knxproj");
         TimeUnit.SECONDS.sleep(300);
         connection.close();
     }
diff --git a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java b/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java
index a743586..f94fb1e 100644
--- a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java
+++ b/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java
@@ -62,7 +62,7 @@ public class ManualKnxNetIpWithEts5 {
         ChannelFactory channelFactory = new UdpSocketChannelFactory(
             gatewayInetAddress, KnxNetIpConnection.KNXNET_IP_PORT);
 
-        NettyPlcConnection connection = new KnxNetIpConnection(channelFactory, "",
+        NettyPlcConnection connection = new KnxNetIpConnection(channelFactory, ""/*,
             new PlcMessageToMessageCodec<KNXNetIPMessage, PlcRequestContainer>() {
                 @Override
                 protected void encode(ChannelHandlerContext ctx, PlcRequestContainer msg, List<Object> out) throws Exception {
@@ -130,7 +130,7 @@ public class ManualKnxNetIpWithEts5 {
             } catch (PlcConnectionException e) {
                 // Just ignore this.
             }
-        }));
+        })*/);
     }
 
     protected static String toString(KNXAddress knxAddress) {
diff --git a/sandbox/test-java-knxnetip-driver/src/test/resources/logback.xml b/sandbox/test-java-knxnetip-driver/src/test/resources/logback.xml
new file mode 100644
index 0000000..27d40c0
--- /dev/null
+++ b/sandbox/test-java-knxnetip-driver/src/test/resources/logback.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<configuration xmlns="http://ch.qos.logback/xml/ns/logback"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback https://raw.githubusercontent.com/enricopulatzo/logback-XSD/master/src/main/xsd/logback.xsd">
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <!-- encoders are assigned the type
+         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="info">
+    <appender-ref ref="STDOUT" />
+  </root>
+
+</configuration>
\ No newline at end of file
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/S7Driver.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/S7Driver.java
index 1d578a0..d61fc03 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/S7Driver.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/S7Driver.java
@@ -33,7 +33,7 @@ import java.util.regex.Pattern;
 @Component(service = PlcDriver.class, immediate = true)
 public class S7Driver implements PlcDriver {
 
-    private static final Pattern S7_URI_PATTERN = Pattern.compile("^s7ng://(?<host>.*)(?<params>\\?.*)?");
+    private static final Pattern S7_URI_PATTERN = Pattern.compile("^s7ng://(?<host>.*)(\\??<params>.*)?");
 
     @Override
     public String getProtocolCode() {
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Configuration.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Configuration.java
index ed9ef68..20adeec 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Configuration.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Configuration.java
@@ -37,7 +37,8 @@ public class S7Configuration {
     @ConfigurationParameter
     public String controllerType;
 
-    @Override public String toString() {
+    @Override
+    public String toString() {
         return "Configuration{" +
             "rack=" + rack +
             ", slot=" + slot +
@@ -47,4 +48,5 @@ public class S7Configuration {
             ", controllerType='" + controllerType + '\'' +
             '}';
     }
+
 }
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
index a525833..5546d09 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
@@ -21,7 +21,6 @@ package org.apache.plc4x.java.s7.readwrite.connection;
 import io.netty.channel.*;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.spi.connection.ChannelFactory;
 import org.apache.plc4x.java.spi.connection.NettyPlcConnection;
 import org.apache.plc4x.java.spi.events.ConnectEvent;
@@ -34,10 +33,7 @@ import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
 import org.apache.plc4x.java.spi.Plc4xProtocolBase;
 import org.apache.plc4x.java.spi.messages.DefaultPlcReadRequest;
 import org.apache.plc4x.java.spi.messages.DefaultPlcWriteRequest;
-import org.apache.plc4x.java.spi.messages.InternalPlcWriteRequest;
-import org.apache.plc4x.java.spi.messages.InternalPlcWriteResponse;
 import org.apache.plc4x.java.spi.messages.PlcReader;
-import org.apache.plc4x.java.spi.messages.PlcRequestContainer;
 import org.apache.plc4x.java.spi.messages.PlcWriter;
 import org.apache.plc4x.java.spi.parser.ConnectionParser;
 import org.apache.plc4x.java.tcp.connection.TcpSocketChannelFactory;
@@ -64,8 +60,6 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
 
         configuration = ConnectionParser.parse("a://1.1.1.1/" + params, S7Configuration.class);
 
-
-
         logger.info("Setting up S7 Connection with Configuration: {}", configuration);
     }
 
@@ -81,8 +75,6 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
 
     @Override
     protected ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture) {
-
-
         return new ChannelInitializer<Channel>() {
             @Override
             protected void initChannel(Channel channel) {
@@ -118,27 +110,10 @@ public class S7Connection extends NettyPlcConnection implements PlcReader, PlcWr
     }
 
     @Override
-    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
-        InternalPlcWriteRequest internalWriteRequest = checkInternal(writeRequest, InternalPlcWriteRequest.class);
-        CompletableFuture<InternalPlcWriteResponse> future = new CompletableFuture<>();
-        PlcRequestContainer<InternalPlcWriteRequest, InternalPlcWriteResponse> container =
-            new PlcRequestContainer<>(internalWriteRequest, future);
-        channel.writeAndFlush(container).addListener(f -> {
-            if (!f.isSuccess()) {
-                future.completeExceptionally(f.cause());
-            }
-        });
-        return future
-            .thenApply(PlcWriteResponse.class::cast);
-    }
-
-    @Override
     protected void sendChannelCreatedEvent() {
         logger.trace("Channel was created, firing ChannelCreated Event");
         // Send an event to the pipeline telling the Protocol filters what's going on.
         channel.pipeline().fireUserEventTriggered(new ConnectEvent());
     }
 
-
-
 }
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index 1dda2fb..f569c31 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -87,21 +87,6 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
         this.maxAmqCallee = configuration.maxAmqCallee;
     }
 
-    /**
-     * Iterate over all values until one is found that the given tpdu size will fit.
-     *
-     * @param tpduSizeParameter requested tpdu size.
-     * @return smallest {@link COTPTpduSize} which will fit a given size of tpdu.
-     */
-    protected COTPTpduSize getNearestMatchingTpduSize(short tpduSizeParameter) {
-        for (COTPTpduSize value : COTPTpduSize.values()) {
-            if (value.getSizeInBytes() >= tpduSizeParameter) {
-                return value;
-            }
-        }
-        return null;
-    }
-
     @Override
     public void onConnect(ConversationContext<TPKTPacket> context) {
         logger.debug("ISO Transport Protocol Sending Connection Request");
@@ -619,4 +604,19 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
             s7Field.getMemoryArea(), s7Field.getByteOffset(), s7Field.getBitOffset());
     }
 
+    /**
+     * Iterate over all values until one is found that the given tpdu size will fit.
+     *
+     * @param tpduSizeParameter requested tpdu size.
+     * @return smallest {@link COTPTpduSize} which will fit a given size of tpdu.
+     */
+    protected COTPTpduSize getNearestMatchingTpduSize(short tpduSizeParameter) {
+        for (COTPTpduSize value : COTPTpduSize.values()) {
+            if (value.getSizeInBytes() >= tpduSizeParameter) {
+                return value;
+            }
+        }
+        return null;
+    }
+
 }
diff --git a/sandbox/test-streampipes-plc4x-adapters/src/main/java/org/apache/plc4x/java/streampipes/adapters/source/knxnetip/KnxNetIpAdapter.java b/sandbox/test-streampipes-plc4x-adapters/src/main/java/org/apache/plc4x/java/streampipes/adapters/source/knxnetip/KnxNetIpAdapter.java
index 3fddb36..e885b33 100644
--- a/sandbox/test-streampipes-plc4x-adapters/src/main/java/org/apache/plc4x/java/streampipes/adapters/source/knxnetip/KnxNetIpAdapter.java
+++ b/sandbox/test-streampipes-plc4x-adapters/src/main/java/org/apache/plc4x/java/streampipes/adapters/source/knxnetip/KnxNetIpAdapter.java
@@ -138,7 +138,7 @@ public class KnxNetIpAdapter extends SpecificDataStreamAdapter {
             InetAddress inetAddress = InetAddress.getByName(gatewayIp);
             ChannelFactory channelFactory = new UdpSocketChannelFactory(inetAddress, KnxNetIpConnection.KNXNET_IP_PORT);
 
-            connection = new KnxNetIpConnection(channelFactory, "",
+            connection = new KnxNetIpConnection(channelFactory, ""/*,
                 new PlcMessageToMessageCodec<KNXNetIPMessage, PlcRequestContainer>() {
 
                 @Override
@@ -183,7 +183,7 @@ public class KnxNetIpAdapter extends SpecificDataStreamAdapter {
                 protected void encode(ChannelHandlerContext ctx, PlcRequestContainer msg, List<Object> out) {
                     // Ignore this as we don't send anything.
                 }
-            });
+            }*/);
             connection.connect();
         } catch (PlcConnectionException e) {
             logger.error("An error occurred starting the BACnet/IP driver", e);