You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/12/24 08:42:53 UTC

[plc4x] branch next-gen-core updated: Refactoring towards Generic Connections Part 2: - Added Another Method to ChannelFactory interface. Ported S7-NG and KNX to the new ChannelFactory Method. - Extended Connection Parser to read all Parts of the Connection String and use it in KNX and S7-NG.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/next-gen-core by this push:
     new 512e93b  Refactoring towards Generic Connections Part 2: - Added Another Method to ChannelFactory interface. Ported S7-NG and KNX to the new ChannelFactory Method. - Extended Connection Parser to read all Parts of the Connection String and use it in KNX and S7-NG.
512e93b is described below

commit 512e93b3a8ef2a5f302d85449a742df2fc0fb07a
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Tue Dec 24 09:42:42 2019 +0100

    Refactoring towards Generic Connections Part 2:
    - Added Another Method to ChannelFactory interface. Ported S7-NG and KNX to the new ChannelFactory Method.
    - Extended Connection Parser to read all Parts of the Connection String and use it in KNX and S7-NG.
---
 .../plc4x/java/spi/connection/ChannelFactory.java  |  17 ++-
 ...nection.java => DefaultNettyPlcConnection.java} |  18 +--
 .../java/spi/connection/NettyChannelFactory.java   |  98 +++++++++++++++
 .../plc4x/java/spi/parser/ConnectionParser.java    |  67 ++++++++++-
 .../java/spi/parser/ConnectionParserTest.java      |  18 ++-
 .../tcp/connection/TcpSocketChannelFactory.java    |  12 ++
 .../base/connection/UdpSocketChannelFactory.java   |  12 ++
 .../connection/AdsAbstractPlcConnection.java       |   5 +-
 sandbox/test-java-knxnetip-driver/pom.xml          |   6 +
 .../apache/plc4x/java/knxnetip/KnxNetIpDriver.java |  47 +++-----
 .../knxnetip/connection/KnxNetIpConnection.java    | 134 ---------------------
 .../java/knxnetip/ManualKnxNetIpWithEts5.java      |  17 +--
 .../apache/plc4x/java/s7/readwrite/S7Driver.java   |  46 ++++---
 .../java/s7/readwrite/connection/S7Connection.java |  67 -----------
 .../adapters/source/knxnetip/KnxNetIpAdapter.java  |  20 +--
 15 files changed, 286 insertions(+), 298 deletions(-)

diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelFactory.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelFactory.java
index e266b7c..d05bc2e 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelFactory.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelFactory.java
@@ -20,13 +20,26 @@ package org.apache.plc4x.java.spi.connection;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 
+import java.net.SocketAddress;
+
 public interface ChannelFactory {
 
-    Channel createChannel(ChannelHandler channelHandler)
-        throws PlcConnectionException;
+    /**
+     * Will likely be removed soon?
+     */
+    @Deprecated
+    Channel createChannel(ChannelHandler channelHandler) throws PlcConnectionException;
+
+    /**
+     * Will be the future interface to Crate Channels.
+     */
+    default Channel createChannel(SocketAddress socketAddress, ChannelHandler channelHandler) throws PlcConnectionException {
+        throw new NotImplementedException("");
+    }
 
     void ping() throws PlcException;
 
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/GenericNettyPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
similarity index 90%
rename from plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/GenericNettyPlcConnection.java
rename to plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
index fc3479a..6fbbff1 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/GenericNettyPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
@@ -41,14 +41,14 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-public abstract class GenericNettyPlcConnection<BASE_PROTOCOL_CLASS extends Message> extends AbstractPlcConnection {
+public class DefaultNettyPlcConnection<BASE_PROTOCOL_CLASS extends Message> extends AbstractPlcConnection {
 
     /**
      * a {@link HashedWheelTimer} shall be only instantiated once.
      */
     // TODO: maybe find a way to make this configurable per jvm
     protected final static Timer timer = new HashedWheelTimer();
-    private static final Logger logger = LoggerFactory.getLogger(GenericNettyPlcConnection.class);
+    private static final Logger logger = LoggerFactory.getLogger(DefaultNettyPlcConnection.class);
     protected final ChannelFactory channelFactory;
 
     protected final boolean awaitSessionSetupComplete;
@@ -58,27 +58,27 @@ public abstract class GenericNettyPlcConnection<BASE_PROTOCOL_CLASS extends Mess
     private GeneratedDriverByteToMessageCodec<BASE_PROTOCOL_CLASS> messageCodec;
     private Plc4xProtocolBase<BASE_PROTOCOL_CLASS> protocolLogic;
 
-    protected GenericNettyPlcConnection(ChannelFactory channelFactory) {
+    protected DefaultNettyPlcConnection(ChannelFactory channelFactory) {
         this(channelFactory, false);
     }
 
-    protected GenericNettyPlcConnection(ChannelFactory channelFactory, boolean awaitSessionSetupComplete) {
+    protected DefaultNettyPlcConnection(ChannelFactory channelFactory, boolean awaitSessionSetupComplete) {
         this.channelFactory = channelFactory;
         this.awaitSessionSetupComplete = awaitSessionSetupComplete;
         this.connected = false;
     }
 
-    protected GenericNettyPlcConnection(ChannelFactory channelFactory, boolean awaitSessionSetupComplete, PlcFieldHandler handler) {
+    protected DefaultNettyPlcConnection(ChannelFactory channelFactory, boolean awaitSessionSetupComplete, PlcFieldHandler handler) {
         super(true, true, false, handler);
         this.channelFactory = channelFactory;
         this.awaitSessionSetupComplete = awaitSessionSetupComplete;
         this.connected = false;
     }
 
-    protected GenericNettyPlcConnection(ChannelFactory channelFactory, boolean awaitSessionSetupComplete, PlcFieldHandler handler,
-                                        Class<BASE_PROTOCOL_CLASS> baseProtocolClass,
-                                        GeneratedDriverByteToMessageCodec<BASE_PROTOCOL_CLASS> messageCodec,
-                                        Plc4xProtocolBase<BASE_PROTOCOL_CLASS> protocolLogic) {
+    public DefaultNettyPlcConnection(ChannelFactory channelFactory, boolean awaitSessionSetupComplete, PlcFieldHandler handler,
+                                     Class<BASE_PROTOCOL_CLASS> baseProtocolClass,
+                                     GeneratedDriverByteToMessageCodec<BASE_PROTOCOL_CLASS> messageCodec,
+                                     Plc4xProtocolBase<BASE_PROTOCOL_CLASS> protocolLogic) {
         this(channelFactory, awaitSessionSetupComplete, handler);
         this.baseProtocolClass = baseProtocolClass;
         this.messageCodec = messageCodec;
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
new file mode 100644
index 0000000..bb5cf68
--- /dev/null
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.spi.connection;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+
+/**
+ * Adapter with sensible defaults for a Netty Based Channel Factory.
+ */
+public abstract class NettyChannelFactory implements ChannelFactory {
+
+    private static final Logger logger = LoggerFactory.getLogger(NettyChannelFactory.class);
+
+    /**
+     * Channel to Use, e.g. NiO, EiO
+     */
+    public abstract Class<? extends Channel> getChannel();
+
+    /**
+     * Event Loop Group to use.
+     * Has to be in accordance with {@link #getChannel()}
+     * otherwise a Runtime Exception will be produced by Netty
+     */
+    public abstract EventLoopGroup getEventLoopGroup();
+
+    @Override public Channel createChannel(SocketAddress socketAddress, ChannelHandler channelHandler) throws PlcConnectionException {
+        try {
+            final EventLoopGroup workerGroup = getEventLoopGroup();
+
+            Bootstrap bootstrap = new Bootstrap();
+            bootstrap.group(workerGroup);
+            bootstrap.channel(getChannel());
+            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+            bootstrap.option(ChannelOption.TCP_NODELAY, true);
+            // TODO we should use an explicit (configurable?) timeout here
+            // bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+            bootstrap.handler(channelHandler);
+            // Start the client.
+            final ChannelFuture f = bootstrap.connect(socketAddress);
+            f.addListener(new GenericFutureListener<Future<? super Void>>() {
+                @Override public void operationComplete(Future<? super Void> future) throws Exception {
+                    if (!future.isSuccess()) {
+                        logger.info("Unable to connect, shutting down worker thread.");
+                        workerGroup.shutdownGracefully();
+                    }
+                }
+            });
+            // Wait for sync
+            f.sync();
+            f.awaitUninterruptibly(); // jf: unsure if we need that
+            // Wait till the session is finished initializing.
+            return f.channel();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PlcConnectionException("Error creating channel.", e);
+        } catch (Exception e) {
+            throw new PlcConnectionException("Error creating channel.", e);
+        }
+    }
+
+    @Override public void ping() throws PlcException {
+
+    }
+}
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/parser/ConnectionParser.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/parser/ConnectionParser.java
index 244a1f8..f02e61a 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/parser/ConnectionParser.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/parser/ConnectionParser.java
@@ -21,10 +21,16 @@ package org.apache.plc4x.java.spi.parser;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 
 import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.UnknownHostException;
 import java.util.AbstractMap;
 import java.util.Arrays;
 import java.util.Collections;
@@ -38,10 +44,60 @@ import java.util.stream.Collectors;
 import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.toList;
 
+/**
+ * A query contains for our cases mostly of three parts
+ * - protocol identifier
+ * - connection address (ip/port), serial port, ...
+ * - path parameters
+ */
 public class ConnectionParser {
 
+    private final String connectionString;
+    private URI uri;
+
+    public ConnectionParser(String protocolCode, String connectionString) throws PlcConnectionException {
+        this.connectionString = connectionString;
+        try {
+            this.uri = new URI(connectionString);
+        } catch (URISyntaxException e) {
+            throw new PlcConnectionException("Unable to parse connection string '" + connectionString + "'", e);
+        }
+        if (!protocolCode.equals(uri.getScheme())) {
+            throw new PlcConnectionException("The given Connection String does not match the expected Protocol '" + protocolCode + "'");
+        }
+    }
+
+    public SocketAddress getSocketAddress() {
+        return this.getSocketAddress(-1);
+    }
+
+    /**
+     * Convenvience Method, as its sometimes allowed to omit port in the URI String, as its
+     * default for some protocols.
+     * Of course only makes sense for TCP based Protocols
+     *
+     * @param defaultPort Default Port
+     * @return Valid InetSocketAddress
+     */
+    public SocketAddress getSocketAddress(int defaultPort) {
+        try {
+            String hostString = uri.getHost();
+            int port = uri.getPort();
+            if (port == -1) {
+                if (defaultPort == -1) {
+                    throw new PlcRuntimeException("No port given in URI String and no default Port given!");
+                } else {
+                    port = defaultPort;
+                }
+            }
+            return new InetSocketAddress(InetAddress.getByName(hostString), port);
+        } catch (UnknownHostException e) {
+            throw new PlcRuntimeException("Unable to resolve Host in connection  string '" + connectionString + "'", e);
+        }
+    }
+
     // TODO Respect Path Params
-    public static <T> T parse(String string, Class<T> pClazz) {
+    public <T> T createConfiguration(Class<T> pClazz) {
         Map<String, Field> fieldMap = Arrays.stream(FieldUtils.getAllFields(pClazz))
             .filter(field -> field.getAnnotation(ConfigurationParameter.class) != null)
             .collect(Collectors.toMap(Field::getName, Function.identity()));
@@ -50,11 +106,10 @@ public class ConnectionParser {
         try {
             instance = pClazz.newInstance();
         } catch (InstantiationException | IllegalAccessException e) {
-            throw new IllegalStateException();
+            throw new IllegalArgumentException("Unable to Instantiate Configuration Class", e);
         }
         try {
-            URI url = new URI(string);
-            Map<String, List<String>> stringListMap = splitQuery(url);
+            Map<String, List<String>> stringListMap = splitQuery(uri);
 
             // TODO notify on unmatched parameters
 
@@ -98,8 +153,8 @@ public class ConnectionParser {
             if (missingFields.size() > 0) {
                 throw new IllegalArgumentException("Missing required fields: " + missingFields);
             }
-        } catch (URISyntaxException | IllegalAccessException e) {
-            e.printStackTrace();
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("Unable to access all fields from Configuration Class '" + pClazz.getSimpleName() + "'", e);
         }
         return instance;
     }
diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/parser/ConnectionParserTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/parser/ConnectionParserTest.java
index 22a31b8..eb01835 100644
--- a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/parser/ConnectionParserTest.java
+++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/parser/ConnectionParserTest.java
@@ -19,21 +19,33 @@
 
 package org.apache.plc4x.java.spi.parser;
 
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.junit.jupiter.api.Test;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 class ConnectionParserTest {
 
     @Test
-    void parse() {
-        ConnectionParser parser = new ConnectionParser();
-        PropertiesDescriptor properties = parser.parse("s7://192.168.167.1?rackId=1", PropertiesDescriptor.class);
+    void parse() throws PlcConnectionException {
+        ConnectionParser parser = new ConnectionParser("s7", "s7://192.168.167.1?rackId=1");
+        PropertiesDescriptor properties = parser.createConfiguration(PropertiesDescriptor.class);
 
         assertEquals(1, properties.rackId);
         assertEquals(1, properties.slotId);
     }
 
+    @Test
+    void parseHost() throws PlcConnectionException {
+        ConnectionParser parser = new ConnectionParser("s7", "s7://192.168.167.1?rackId=1");
+        SocketAddress inetSocketAddress = parser.getSocketAddress(102);
+
+        assertEquals(new InetSocketAddress("192.168.167.1", 102), inetSocketAddress);
+    }
+
     static class PropertiesDescriptor {
 
         @ConfigurationParameter("rackId")
diff --git a/plc4j/transports/tcp/src/main/java/org/apache/plc4x/java/tcp/connection/TcpSocketChannelFactory.java b/plc4j/transports/tcp/src/main/java/org/apache/plc4x/java/tcp/connection/TcpSocketChannelFactory.java
index 50ee955..b6add55 100644
--- a/plc4j/transports/tcp/src/main/java/org/apache/plc4x/java/tcp/connection/TcpSocketChannelFactory.java
+++ b/plc4j/transports/tcp/src/main/java/org/apache/plc4x/java/tcp/connection/TcpSocketChannelFactory.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketAddress;
 
 public class TcpSocketChannelFactory implements ChannelFactory {
 
@@ -46,11 +47,22 @@ public class TcpSocketChannelFactory implements ChannelFactory {
     private final InetAddress address;
     private final int port;
 
+    /**
+     * @deprecated the next-gen drivers should use the {@link #TcpSocketChannelFactory(SocketAddress)}
+     * constructor.
+     */
+    @Deprecated
     public TcpSocketChannelFactory(InetAddress address, int port) {
         this.address = address;
         this.port = port;
     }
 
+    public TcpSocketChannelFactory(SocketAddress address) {
+        assert address instanceof InetSocketAddress;
+        this.address = ((InetSocketAddress) address).getAddress();
+        this.port = ((InetSocketAddress) address).getPort();
+    }
+
     @Override
     public Channel createChannel(ChannelHandler channelHandler)
         throws PlcConnectionException {
diff --git a/plc4j/transports/udp/src/main/java/org/apache/plc4x/java/base/connection/UdpSocketChannelFactory.java b/plc4j/transports/udp/src/main/java/org/apache/plc4x/java/base/connection/UdpSocketChannelFactory.java
index 22dbe86..e303fd1 100644
--- a/plc4j/transports/udp/src/main/java/org/apache/plc4x/java/base/connection/UdpSocketChannelFactory.java
+++ b/plc4j/transports/udp/src/main/java/org/apache/plc4x/java/base/connection/UdpSocketChannelFactory.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketAddress;
 
 public class UdpSocketChannelFactory implements ChannelFactory {
 
@@ -45,11 +46,22 @@ public class UdpSocketChannelFactory implements ChannelFactory {
     private final InetAddress address;
     private final int port;
 
+    /**
+     * @deprecated the next-gen drivers should use the {@link #UdpSocketChannelFactory(SocketAddress)}
+     * constructor.
+     */
+    @Deprecated
     public UdpSocketChannelFactory(InetAddress address, int port) {
         this.address = address;
         this.port = port;
     }
 
+    public UdpSocketChannelFactory(SocketAddress address) {
+        assert address instanceof InetSocketAddress;
+        this.address = ((InetSocketAddress) address).getAddress();
+        this.port = ((InetSocketAddress) address).getPort();
+    }
+
     @Override
     public Channel createChannel(ChannelHandler channelHandler)
         throws PlcConnectionException {
diff --git a/sandbox/test-java-amsads-driver/src/main/java/org/apache/plc4x/java/amsads/connection/AdsAbstractPlcConnection.java b/sandbox/test-java-amsads-driver/src/main/java/org/apache/plc4x/java/amsads/connection/AdsAbstractPlcConnection.java
index 5bb766b..a60e914 100644
--- a/sandbox/test-java-amsads-driver/src/main/java/org/apache/plc4x/java/amsads/connection/AdsAbstractPlcConnection.java
+++ b/sandbox/test-java-amsads-driver/src/main/java/org/apache/plc4x/java/amsads/connection/AdsAbstractPlcConnection.java
@@ -29,8 +29,7 @@ import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.spi.connection.ChannelFactory;
-import org.apache.plc4x.java.spi.connection.GenericNettyPlcConnection;
-import org.apache.plc4x.java.spi.connection.NettyPlcConnection;
+import org.apache.plc4x.java.spi.connection.DefaultNettyPlcConnection;
 import org.apache.plc4x.java.spi.messages.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +38,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.concurrent.*;
 
-public abstract class AdsAbstractPlcConnection extends GenericNettyPlcConnection implements PlcReader, PlcWriter, PlcProprietarySender {
+public abstract class AdsAbstractPlcConnection extends DefaultNettyPlcConnection implements PlcReader, PlcWriter, PlcProprietarySender {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AdsAbstractPlcConnection.class);
 
diff --git a/sandbox/test-java-knxnetip-driver/pom.xml b/sandbox/test-java-knxnetip-driver/pom.xml
index 799cd1f..0ac40da 100644
--- a/sandbox/test-java-knxnetip-driver/pom.xml
+++ b/sandbox/test-java-knxnetip-driver/pom.xml
@@ -125,6 +125,12 @@
       <!-- Scope is 'provided' as this way it's not shipped with the driver -->
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-transport-tcp</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+      <scope>compile</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/KnxNetIpDriver.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/KnxNetIpDriver.java
index 2678a8a..9ef6e2b 100644
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/KnxNetIpDriver.java
+++ b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/KnxNetIpDriver.java
@@ -21,13 +21,22 @@ package org.apache.plc4x.java.knxnetip;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.knxnetip.connection.KnxNetIpConnection;
+import org.apache.plc4x.java.base.connection.UdpSocketChannelFactory;
+import org.apache.plc4x.java.knxnetip.connection.KnxNetIpConfiguration;
 import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.knxnetip.connection.KnxNetIpFieldHandler;
+import org.apache.plc4x.java.knxnetip.protocol.KnxNetIpProtocolLogic;
+import org.apache.plc4x.java.knxnetip.protocol.KnxNetIpProtocolMessage;
+import org.apache.plc4x.java.knxnetip.readwrite.KNXNetIPMessage;
+import org.apache.plc4x.java.spi.connection.DefaultNettyPlcConnection;
+import org.apache.plc4x.java.spi.parser.ConnectionParser;
 
 import java.net.*;
 
 public class KnxNetIpDriver implements PlcDriver {
 
+    public static final int KNXNET_IP_PORT = 3671;
+
     @Override
     public String getProtocolCode() {
         return "knxnet-ip";
@@ -40,32 +49,16 @@ public class KnxNetIpDriver implements PlcDriver {
 
     @Override
     public PlcConnection connect(String connectionString) throws PlcConnectionException {
-        URL url;
-        try {
-            url = new URL(null, connectionString, new URLStreamHandler() {
-                @Override
-                protected URLConnection openConnection(URL u) {
-                    return null;
-                }
-            });
-        } catch (MalformedURLException e) {
-            throw new PlcConnectionException("Error parsing connection string " + connectionString, e);
-        }
-
-        try {
-            InetAddress serverInetAddress = InetAddress.getByName(url.getHost());
-            PlcConnection connection = new KnxNetIpConnection(serverInetAddress, url.getQuery());
-            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-                try {
-                    connection.close();
-                } catch (Exception e) {
-                    // Ignore this ...
-                }
-            }));
-            return connection;
-        } catch (Exception e) {
-            throw new PlcConnectionException("Error connecting to host", e);
-        }
+        ConnectionParser parser = new ConnectionParser(getProtocolCode(), connectionString);
+        KnxNetIpConfiguration configuration = parser.createConfiguration(KnxNetIpConfiguration.class);
+        SocketAddress address = parser.getSocketAddress(KNXNET_IP_PORT);
+        return new DefaultNettyPlcConnection<>(new UdpSocketChannelFactory(address), true, new KnxNetIpFieldHandler(),
+            KNXNetIPMessage.class,
+            new KnxNetIpProtocolMessage(),
+            new KnxNetIpProtocolLogic(
+                configuration
+            )
+        );
     }
 
     @Override
diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/connection/KnxNetIpConnection.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/connection/KnxNetIpConnection.java
deleted file mode 100644
index cd0b941..0000000
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/connection/KnxNetIpConnection.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.java.knxnetip.connection;
-
-import io.netty.channel.*;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.model.PlcField;
-import org.apache.plc4x.java.base.connection.UdpSocketChannelFactory;
-import org.apache.plc4x.java.base.connection.protocol.DatagramUnpackingHandler;
-import org.apache.plc4x.java.knxnetip.readwrite.KNXNetIPMessage;
-import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
-import org.apache.plc4x.java.spi.Plc4xProtocolBase;
-import org.apache.plc4x.java.spi.connection.ChannelFactory;
-import org.apache.plc4x.java.spi.connection.NettyPlcConnection;
-import org.apache.plc4x.java.spi.events.ConnectEvent;
-import org.apache.plc4x.java.spi.events.ConnectedEvent;
-import org.apache.plc4x.java.spi.events.DisconnectEvent;
-import org.apache.plc4x.java.knxnetip.model.KnxNetIpField;
-import org.apache.plc4x.java.knxnetip.protocol.KnxNetIpProtocolLogic;
-import org.apache.plc4x.java.knxnetip.protocol.KnxNetIpProtocolMessage;
-import org.apache.plc4x.java.spi.messages.*;
-import org.apache.plc4x.java.spi.parser.ConnectionParser;
-
-import java.net.InetAddress;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-public class KnxNetIpConnection extends NettyPlcConnection implements PlcReader {
-
-    public static final int KNXNET_IP_PORT = 3671;
-
-    private final KnxNetIpConfiguration configuration;
-
-    public KnxNetIpConnection(InetAddress address, String params) {
-        this(new UdpSocketChannelFactory(address, KNXNET_IP_PORT), params);
-    }
-
-    public KnxNetIpConnection(ChannelFactory channelFactory, String params) {
-        super(channelFactory, true);
-        configuration = ConnectionParser.parse("a://1.1.1.1?" + params, KnxNetIpConfiguration.class);
-    }
-
-    @Override
-    public PlcField prepareField(String fieldQuery) {
-        return KnxNetIpField.of(fieldQuery);
-    }
-
-    @Override
-    protected ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture) {
-        return new ChannelInitializer<Channel>() {
-            @Override
-            protected void initChannel(Channel channel) {
-                // Build the protocol stack for communicating with the s7 protocol.
-                ChannelPipeline pipeline = channel.pipeline();
-                pipeline.addLast(new ChannelInboundHandlerAdapter() {
-                    @Override
-                    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-                        if (evt instanceof ConnectedEvent) {
-                            sessionSetupCompleteFuture.complete(null);
-                        } else {
-                            super.userEventTriggered(ctx, evt);
-                        }
-                    }
-                });
-                // Unpack the ByteBuf included in the DatagramPackage.
-                pipeline.addLast(new DatagramUnpackingHandler());
-                pipeline.addLast(new KnxNetIpProtocolMessage());
-
-                Plc4xProtocolBase<KNXNetIPMessage> knxNetIpProtocolLogic = new KnxNetIpProtocolLogic(configuration);
-                setProtocol(knxNetIpProtocolLogic);
-                Plc4xNettyWrapper<KNXNetIPMessage> context =
-                    new Plc4xNettyWrapper<>(pipeline, knxNetIpProtocolLogic, KNXNetIPMessage.class);
-                pipeline.addLast(context);
-            }
-        };
-    }
-
-    @Override
-    public boolean canRead() {
-        return true;
-    }
-
-    @Override
-    public PlcReadRequest.Builder readRequestBuilder() {
-        return new DefaultPlcReadRequest.Builder(this, new KnxNetIpFieldHandler());
-    }
-
-    @Override
-    public void close() throws PlcConnectionException {
-        if(channel == null) {
-            super.close();
-            return;
-        }
-
-        CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
-        channel.pipeline().fireUserEventTriggered(new DisconnectEvent(disconnectFuture));
-        try {
-            // Wait for the connection to be disconnected.
-            disconnectFuture.get(500, TimeUnit.MILLISECONDS);
-            super.close();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PlcConnectionException("Error closing connection");
-        } catch (ExecutionException | TimeoutException e) {
-            throw new PlcConnectionException("Error closing connection");
-        }
-    }
-
-    @Override
-    protected void sendChannelCreatedEvent() {
-        // Send an event to the pipeline telling the Protocol filters what's going on.
-        channel.pipeline().fireUserEventTriggered(new ConnectEvent());
-    }
-
-}
diff --git a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java b/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java
index c055b78..5ffb666 100644
--- a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java
+++ b/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java
@@ -19,14 +19,13 @@ under the License.
 package org.apache.plc4x.java.knxnetip;
 
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.base.connection.UdpSocketChannelFactory;
-import org.apache.plc4x.java.spi.connection.ChannelFactory;
-import org.apache.plc4x.java.spi.connection.NettyPlcConnection;
-import org.apache.plc4x.java.ets5.passive.*;
-import org.apache.plc4x.java.knxnetip.connection.KnxNetIpConnection;
+import org.apache.plc4x.java.ets5.passive.KNXGroupAddress;
+import org.apache.plc4x.java.ets5.passive.KNXGroupAddress2Level;
+import org.apache.plc4x.java.ets5.passive.KNXGroupAddress3Level;
+import org.apache.plc4x.java.ets5.passive.KNXGroupAddressFreeLevel;
 import org.apache.plc4x.java.knxnetip.ets5.Ets5Parser;
 import org.apache.plc4x.java.knxnetip.ets5.model.Ets5Model;
-import org.apache.plc4x.java.knxnetip.readwrite.*;
+import org.apache.plc4x.java.knxnetip.readwrite.KNXAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,10 +49,11 @@ public class ManualKnxNetIpWithEts5 {
     }
 
     private void start() throws PlcConnectionException {
+        /*
         ChannelFactory channelFactory = new UdpSocketChannelFactory(
             gatewayInetAddress, KnxNetIpConnection.KNXNET_IP_PORT);
 
-        NettyPlcConnection connection = new KnxNetIpConnection(channelFactory, ""/*,
+        NettyPlcConnection connection = new KnxNetIpConnection(channelFactory, "",
             new PlcMessageToMessageCodec<KNXNetIPMessage, PlcRequestContainer>() {
                 @Override
                 protected void encode(ChannelHandlerContext ctx, PlcRequestContainer msg, List<Object> out) throws Exception {
@@ -121,7 +121,8 @@ public class ManualKnxNetIpWithEts5 {
             } catch (PlcConnectionException e) {
                 // Just ignore this.
             }
-        })*/);
+        }));
+        */
     }
 
     protected static String toString(KNXAddress knxAddress) {
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/S7Driver.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/S7Driver.java
index 6869873..7cd4bfa 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/S7Driver.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/S7Driver.java
@@ -19,21 +19,25 @@ under the License.
 package org.apache.plc4x.java.s7.readwrite;
 
 import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.s7.readwrite.connection.S7Connection;
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.s7.readwrite.connection.S7Configuration;
+import org.apache.plc4x.java.s7.readwrite.protocol.S7ProtocolLogic;
+import org.apache.plc4x.java.s7.readwrite.protocol.S7ProtocolMessage;
+import org.apache.plc4x.java.s7.readwrite.utils.S7PlcFieldHandler;
+import org.apache.plc4x.java.spi.connection.DefaultNettyPlcConnection;
+import org.apache.plc4x.java.spi.parser.ConnectionParser;
+import org.apache.plc4x.java.tcp.connection.TcpSocketChannelFactory;
 import org.osgi.service.component.annotations.Component;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 
 @Component(service = PlcDriver.class, immediate = true)
 public class S7Driver implements PlcDriver {
 
-    private static final Pattern S7_URI_PATTERN = Pattern.compile("^s7ng://(?<host>.*)(\\??<params>.*)?");
+    private static final int ISO_ON_TCP_PORT = 102;
 
     @Override
     public String getProtocolCode() {
@@ -47,24 +51,16 @@ public class S7Driver implements PlcDriver {
 
     @Override
     public PlcConnection connect(String url) throws PlcConnectionException {
-        Matcher matcher = S7_URI_PATTERN.matcher(url);
-        if (!matcher.matches()) {
-            throw new PlcConnectionException(
-                "Connection url doesn't match the format 's7ng://{host|ip}'");
-        }
-        String host = matcher.group("host");
-
-        // String params = matcher.group("params") != null ? matcher.group("params").substring(1) : null;
-        String params = "";
-
-        try {
-            InetAddress serverInetAddress = InetAddress.getByName(host);
-            return new S7Connection(serverInetAddress, params);
-        } catch (UnknownHostException e) {
-            throw new PlcConnectionException("Error parsing address", e);
-        } catch (Exception e) {
-            throw new PlcConnectionException("Error connecting to host", e);
-        }
+        ConnectionParser parser = new ConnectionParser(getProtocolCode(), url);
+        S7Configuration configuration = parser.createConfiguration(S7Configuration.class);
+        SocketAddress address = parser.getSocketAddress(ISO_ON_TCP_PORT);
+        return new DefaultNettyPlcConnection<>(new TcpSocketChannelFactory(address), true, new S7PlcFieldHandler(),
+            TPKTPacket.class,
+            new S7ProtocolMessage(),
+            new S7ProtocolLogic(
+                configuration
+            )
+        );
     }
 
     @Override
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
deleted file mode 100644
index 6522496..0000000
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7Connection.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.java.s7.readwrite.connection;
-
-import io.netty.channel.*;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.java.spi.connection.ChannelFactory;
-import org.apache.plc4x.java.spi.connection.GenericNettyPlcConnection;
-import org.apache.plc4x.java.spi.connection.NettyPlcConnection;
-import org.apache.plc4x.java.spi.events.ConnectEvent;
-import org.apache.plc4x.java.spi.events.ConnectedEvent;
-import org.apache.plc4x.java.s7.readwrite.TPKTPacket;
-import org.apache.plc4x.java.s7.readwrite.protocol.S7ProtocolLogic;
-import org.apache.plc4x.java.s7.readwrite.protocol.S7ProtocolMessage;
-import org.apache.plc4x.java.s7.readwrite.utils.S7PlcFieldHandler;
-import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
-import org.apache.plc4x.java.spi.Plc4xProtocolBase;
-import org.apache.plc4x.java.spi.messages.DefaultPlcReadRequest;
-import org.apache.plc4x.java.spi.messages.DefaultPlcWriteRequest;
-import org.apache.plc4x.java.spi.messages.PlcReader;
-import org.apache.plc4x.java.spi.messages.PlcWriter;
-import org.apache.plc4x.java.spi.parser.ConnectionParser;
-import org.apache.plc4x.java.tcp.connection.TcpSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.util.concurrent.CompletableFuture;
-
-public class S7Connection extends GenericNettyPlcConnection<TPKTPacket> implements PlcReader, PlcWriter {
-
-    private static final int ISO_ON_TCP_PORT = 102;
-
-    private static final Logger logger = LoggerFactory.getLogger(S7Connection.class);
-
-    public S7Connection(InetAddress address, String params) {
-        this(new TcpSocketChannelFactory(address, ISO_ON_TCP_PORT), params);
-    }
-
-    public S7Connection(ChannelFactory channelFactory, String params) {
-        super(channelFactory, true, new S7PlcFieldHandler(),
-            TPKTPacket.class,
-            new S7ProtocolMessage(),
-            new S7ProtocolLogic(
-                ConnectionParser.parse("a://1.1.1.1/" + params, S7Configuration.class)
-            )
-        );
-    }
-
-}
diff --git a/sandbox/test-streampipes-plc4x-adapters/src/main/java/org/apache/plc4x/java/streampipes/adapters/source/knxnetip/KnxNetIpAdapter.java b/sandbox/test-streampipes-plc4x-adapters/src/main/java/org/apache/plc4x/java/streampipes/adapters/source/knxnetip/KnxNetIpAdapter.java
index e885b33..f50dc97 100644
--- a/sandbox/test-streampipes-plc4x-adapters/src/main/java/org/apache/plc4x/java/streampipes/adapters/source/knxnetip/KnxNetIpAdapter.java
+++ b/sandbox/test-streampipes-plc4x-adapters/src/main/java/org/apache/plc4x/java/streampipes/adapters/source/knxnetip/KnxNetIpAdapter.java
@@ -18,16 +18,9 @@ under the License.
 */
 package org.apache.plc4x.java.streampipes.adapters.source.knxnetip;
 
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.commons.codec.binary.Hex;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.base.connection.UdpSocketChannelFactory;
-import org.apache.plc4x.java.spi.PlcMessageToMessageCodec;
-import org.apache.plc4x.java.spi.connection.ChannelFactory;
+import org.apache.plc4x.java.knxnetip.readwrite.KNXAddress;
 import org.apache.plc4x.java.spi.connection.NettyPlcConnection;
-import org.apache.plc4x.java.spi.messages.PlcRequestContainer;
-import org.apache.plc4x.java.knxnetip.connection.KnxNetIpConnection;
-import org.apache.plc4x.java.knxnetip.readwrite.*;
 import org.apache.plc4x.java.streampipes.shared.source.knxnetip.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,12 +41,8 @@ import org.streampipes.sdk.helpers.Labels;
 import org.streampipes.sdk.utils.Datatypes;
 import org.streampipes.vocabulary.SO;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 public class KnxNetIpAdapter extends SpecificDataStreamAdapter {
 
@@ -134,11 +123,12 @@ public class KnxNetIpAdapter extends SpecificDataStreamAdapter {
         if((connection != null) && (connection.isConnected())) {
             return;
         }
+        /*
         try {
             InetAddress inetAddress = InetAddress.getByName(gatewayIp);
             ChannelFactory channelFactory = new UdpSocketChannelFactory(inetAddress, KnxNetIpConnection.KNXNET_IP_PORT);
 
-            connection = new KnxNetIpConnection(channelFactory, ""/*,
+            connection = new KnxNetIpConnection(channelFactory, "",
                 new PlcMessageToMessageCodec<KNXNetIPMessage, PlcRequestContainer>() {
 
                 @Override
@@ -183,8 +173,9 @@ public class KnxNetIpAdapter extends SpecificDataStreamAdapter {
                 protected void encode(ChannelHandlerContext ctx, PlcRequestContainer msg, List<Object> out) {
                     // Ignore this as we don't send anything.
                 }
-            }*/);
+            });
             connection.connect();
+
         } catch (PlcConnectionException e) {
             logger.error("An error occurred starting the BACnet/IP driver", e);
             throw new AdapterException("An error occurred starting the BACnet/IP driver");
@@ -194,6 +185,7 @@ public class KnxNetIpAdapter extends SpecificDataStreamAdapter {
         } catch (Exception e) {
             logger.error("Something strange went wrong.", e);
         }
+        */
     }
 
     @Override