You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/21 15:19:30 UTC

[plc4x] 01/02: Big progress with the new API. Minimal WOrking example... Works.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/new-api
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit fb758c38c5b070422eaa4211d233c0633a9e4fac
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Wed Aug 21 15:48:12 2019 +0200

    Big progress with the new API. Minimal WOrking example... Works.
---
 .../org/apache/plc4x/java/base/next/Example.java   | 36 ++++-----
 .../{TransactionLevel.java => IsolationLevel.java} |  2 +-
 .../plc4x/java/base/next/PlcBatchRequest.java      | 13 +++
 .../apache/plc4x/java/base/next/PlcConnection.java | 12 ++-
 .../plc4x/java/base/next/PlcConnectionImpl.java    | 82 +++++++++++++++++++
 .../org/apache/plc4x/java/base/next/PlcDriver.java | 26 +++++-
 .../apache/plc4x/java/base/next/PlcDriverImpl.java | 42 ++++++++++
 .../plc4x/java/base/next/PlcDriverManager.java     |  3 +-
 .../plc4x/java/base/next/PlcDriverManagerImpl.java | 48 +++++++++++
 .../apache/plc4x/java/base/next/PlcExecutor.java   | 10 +++
 .../apache/plc4x/java/base/next/PlcProtocol.java   | 27 +++++++
 .../plc4x/java/base/next/PlcReadRequest.java       |  4 -
 .../plc4x/java/base/next/PlcTransaction.java       | 13 ---
 .../plc4x/java/base/next/PlcTransportFactory.java  | 88 ++++++++++++++++++++
 .../plc4x/java/base/next/PlcWriteRequest.java      |  4 -
 .../plc4x/java/base/next/RequestInformation.java   | 69 ++++++++++++++++
 .../java/base/next/commands/FailedRequest.java     | 15 ++++
 .../plc4x/java/base/next/commands/Message.java     | 14 ++++
 .../plc4x/java/base/next/commands/ReadRequest.java | 19 +++++
 .../plc4x/java/base/next/commands/Response.java    |  9 +++
 .../java/base/next/commands/WriteRequest.java      |  8 ++
 .../plc4x/java/base/next/netty/PlcApiCodec.java    | 94 ++++++++++++++++++++++
 .../java/base/next/netty/TcpTransportFactory.java  | 20 +++++
 .../java/base/next/PlcDriverManagerImplTest.java   | 84 +++++++++++++++++++
 24 files changed, 697 insertions(+), 45 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/Example.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/Example.java
index 062b60b..a7a5a36 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/Example.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/Example.java
@@ -5,23 +5,23 @@ import java.util.concurrent.ExecutionException;
 public class Example {
 
     public static void main(String[] args) throws ExecutionException, InterruptedException {
-        PlcDriverManager driverManager = null;
-
-        // Get a Connection
-        PlcConnection conn = driverManager
-            .connect("s7://xxxxxx")
-            .get();
-
-        // Get a Driver
-        PlcDriver driver = driverManager
-            .getDriver("s7://xxxxx");
-
-        // Get Connection from Driver
-        PlcConnection conn2 = driver.connect()
-            .get();
-
-        // Now do a request
-//        conn.startTransaction(TransactionLevel.WEAK)
-//            .read("field")
+//        PlcDriverManager driverManager = null;
+//
+//        // Get a Connection
+//        PlcConnection conn = driverManager
+//            .connect("s7://xxxxxx")
+//            .get();
+//
+//        // Get a Driver
+//        PlcDriver driver = driverManager
+//            .getDriver("s7://xxxxx");
+//
+//        // Get Connection from Driver
+//        PlcConnection conn2 = driver.connect()
+//            .get();
+//
+//        // Now do a request
+////        conn.startBatch(IsolationLevel.WEAK)
+////            .read("field")
     }
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/TransactionLevel.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/IsolationLevel.java
similarity index 63%
rename from plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/TransactionLevel.java
rename to plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/IsolationLevel.java
index 9c5cb57..d3b3ee9 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/TransactionLevel.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/IsolationLevel.java
@@ -1,5 +1,5 @@
 package org.apache.plc4x.java.base.next;
 
-public enum TransactionLevel {
+public enum IsolationLevel {
     WEAK
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcBatchRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcBatchRequest.java
new file mode 100644
index 0000000..ce74230
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcBatchRequest.java
@@ -0,0 +1,13 @@
+package org.apache.plc4x.java.base.next;
+
+import java.util.concurrent.Future;
+
+public interface PlcBatchRequest {
+
+    PlcBatchRequest read(String alias, String fieldQuery);
+
+    <T> PlcBatchRequest write(String alias, String fieldQuery, T value);
+
+    Future<PlcTransactionResponse> execute();
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
index 26d5955..8863ad1 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
@@ -1,14 +1,20 @@
 package org.apache.plc4x.java.base.next;
 
+import org.apache.plc4x.java.base.next.commands.Response;
+
 import java.util.concurrent.Future;
 
 public interface PlcConnection {
 
-    Future<PlcResponse> execute(PlcRequest request);
-
     Future<PlcResponse> read(String fieldQuery);
 
     <T> Future<PlcResponse> write(String fieldQuery, T value);
 
-    PlcTransaction startTransaction(TransactionLevel level);
+    PlcBatchRequest startBatch(IsolationLevel level);
+
+    /**
+     * Is called from downstream to notify the Connection object, that there is a response
+     * for one of its requests.
+     */
+    void respond(Response response);
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java
new file mode 100644
index 0000000..38e81af
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java
@@ -0,0 +1,82 @@
+package org.apache.plc4x.java.base.next;
+
+import io.netty.channel.Channel;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.plc4x.java.base.next.commands.FailedRequest;
+import org.apache.plc4x.java.base.next.commands.ReadRequest;
+import org.apache.plc4x.java.base.next.commands.Response;
+import org.apache.plc4x.java.base.next.netty.PlcApiCodec;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Wrapper around nettys {@link Channel} which represents one Connection to a device.
+ * This implementation should be (mostly) generic and drivers do NOT have to subclass it.
+ */
+public class PlcConnectionImpl implements PlcConnection {
+
+    private final PlcDriver driver;
+    private final PlcApiCodec codec;
+    private final Channel channel;
+
+    // We have to keep track of all "in flight" messages here
+    private final Map<Integer, RequestInformation> requests;
+    private final AtomicInteger transactionCounter = new AtomicInteger(0);
+
+    public PlcConnectionImpl(PlcDriver driver, PlcApiCodec codec, Channel channel) {
+        this.driver = driver;
+        this.codec = codec;
+        this.channel = channel;
+        requests = new ConcurrentHashMap<>(); // Better to be thread safe here
+        // Register this object with the Codec
+        this.codec.registerConnection(this);
+    }
+
+    @Override public Future<PlcResponse> read(String fieldQuery) {
+        // TODO do we parse here or in the protocol??
+        // Prepare the container here
+        final int transactionId = transactionCounter.getAndIncrement();
+        final ReadRequest request = new ReadRequest(transactionId, fieldQuery);
+        // Create Request Information Object
+        final RequestInformation information = new RequestInformation(transactionId);
+        information.setRequest(request);
+        information.setStatus(RequestInformation.Status.PREPARING);
+        // Register Request
+        requests.put(transactionId, information);
+
+        channel.pipeline().write(request).addListener(future -> {
+            if (future.isSuccess()) {
+                information.setStatus(RequestInformation.Status.SENT);
+            } else {
+                // TODO make convencience method here?
+                information.setStatus(RequestInformation.Status.FAILED);
+                information.setCause(future.cause());
+                information.getFuture().completeExceptionally(future.cause());
+            }
+        });
+        return information.getFuture();
+    }
+
+    @Override public <T> Future<PlcResponse> write(String fieldQuery, T value) {
+        // TODO use common base with #read()
+        throw new NotImplementedException("");
+    }
+
+    @Override public PlcBatchRequest startBatch(IsolationLevel level) {
+        throw new NotImplementedException("");
+    }
+
+    @Override public void respond(Response response) {
+        // Take the necessary action
+        assert requests.containsKey(response.getTransactionId());
+        final RequestInformation information = requests.get(response.getTransactionId());
+        if (response instanceof FailedRequest) {
+            information.getFuture().completeExceptionally(((FailedRequest) response).getCause());
+        } else {
+            throw new IllegalStateException("This type of response " + response.getClass() + " is not implemented");
+        }
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriver.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriver.java
index b9eb0d2..c553bba 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriver.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriver.java
@@ -1,11 +1,35 @@
 package org.apache.plc4x.java.base.next;
 
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.base.next.netty.TcpTransportFactory;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.concurrent.Future;
 
+/**
+ * Question: Is this a shared instance (Singleton) or one for each "connection",
+ * then we need a factory.
+ */
 public interface PlcDriver {
 
     boolean validate(String fieldQuery);
 
-    Future<PlcConnection> connect();
+    Future<PlcConnection> connect(String connection) throws PlcConnectionException;
+
+    boolean accepts(String connection);
+
+    interface ConnectionParserFactory {
+
+        ConnectionParser parse(String connection) throws PlcConnectionException;
+
+    }
+
+    interface ConnectionParser {
+
+        PlcTransportFactory getTransport();
+
+        SocketAddress getSocketAddress();
 
+    }
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverImpl.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverImpl.java
new file mode 100644
index 0000000..bc568d4
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverImpl.java
@@ -0,0 +1,42 @@
+package org.apache.plc4x.java.base.next;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+/**
+ * Base class for the Drivers.
+ * All that a new Protocol has to override is the Parsing for the Fields (TODO general)
+ * and a specific instance of a Protocol.
+ */
+public abstract class PlcDriverImpl implements PlcDriver {
+
+    protected abstract ConnectionParserFactory parserFactory();
+
+    protected abstract PlcProtocol getProtocol();
+
+    @Override public Future<PlcConnection> connect(String connection) throws PlcConnectionException {
+        // Parse connection
+        final ConnectionParser parser = parserFactory().parse(connection);
+        // Prepare the Completable Future
+        final CompletableFuture<PlcConnection> returnFuture = new CompletableFuture<>();
+        // Establish the connection with callback for Async
+        final PlcTransportFactory transportFactory = parser.getTransport();
+        final ChannelFuture channelFuture = transportFactory.connect(getProtocol(), parser.getSocketAddress());
+        channelFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
+            @Override public void operationComplete(io.netty.util.concurrent.Future<? super Void> f) throws Exception {
+                // If sucesssfull return the new Connection instance, if not, cancel
+                if (f.isSuccess()) {
+                    returnFuture.complete(new PlcConnectionImpl(PlcDriverImpl.this, transportFactory.getCodec(), ((ChannelFuture) f).channel()));
+                } else {
+                    returnFuture.completeExceptionally(f.cause());
+                }
+            }
+        });
+        return returnFuture;
+    }
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManager.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManager.java
index 2db4efe..7e3c7ad 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManager.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManager.java
@@ -1,11 +1,12 @@
 package org.apache.plc4x.java.base.next;
 
+import java.util.Optional;
 import java.util.concurrent.Future;
 
 public interface PlcDriverManager {
 
     Future<PlcConnection> connect(String connection);
 
-    PlcDriver getDriver(String connection);
+    Optional<PlcDriver> getDriver(String connection);
 
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManagerImpl.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManagerImpl.java
new file mode 100644
index 0000000..52b5c2d
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManagerImpl.java
@@ -0,0 +1,48 @@
+package org.apache.plc4x.java.base.next;
+
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.Future;
+
+public class PlcDriverManagerImpl implements PlcDriverManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(PlcDriverManagerImpl.class);
+    
+    private final Collection<PlcDriver> regisgtered;
+
+    public PlcDriverManagerImpl() {
+        // TODO Implement this
+        regisgtered = null;
+    }
+
+    /** For testing **/
+    PlcDriverManagerImpl(Collection<PlcDriver> regisgtered) {
+        this.regisgtered = regisgtered;
+    }
+
+    @Override public Future<PlcConnection> connect(String connection) {
+        return getDriver(connection)
+            .map(drv -> {
+                try {
+                    return drv.connect(connection);
+                } catch (PlcConnectionException e) {
+                    throw new PlcRuntimeException(e);
+                }
+            })
+            .orElseThrow(() -> new RuntimeException("No suitable driver found for connection ‘" + connection + "‘"));
+    }
+
+    @Override public Optional<PlcDriver> getDriver(String connection) {
+        for (PlcDriver driver : regisgtered) {
+            if (driver.accepts(connection)) {
+                return Optional.of(driver);
+            }
+        }
+        return Optional.empty();
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcExecutor.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcExecutor.java
new file mode 100644
index 0000000..0b57d68
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcExecutor.java
@@ -0,0 +1,10 @@
+package org.apache.plc4x.java.base.next;
+
+/**
+ * This class is the link between the API classes like
+ * {@link PlcDriver}, @{@link PlcConnection} and so on, the Protocols and the Transport
+ * layer which is most likely always netty.
+ */
+public class PlcExecutor {
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java
new file mode 100644
index 0000000..a092753
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java
@@ -0,0 +1,27 @@
+package org.apache.plc4x.java.base.next;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.plc4x.java.base.next.commands.ReadRequest;
+import org.apache.plc4x.java.base.next.commands.Response;
+
+public interface PlcProtocol {
+
+    void init();
+
+    void close();
+
+    /**
+     * This method encodes a read request to bytes
+     */
+    void encode(ReadRequest command, ByteBuf out);
+
+    /**
+     * This method gets offered a buffer
+     * @throws UnableToParseException whenever there are not enough bytes to decode a response
+     */
+    Response decode(ByteBuf buf) throws UnableToParseException;
+
+    class UnableToParseException extends Exception {
+
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcReadRequest.java
deleted file mode 100644
index a488880..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcReadRequest.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.plc4x.java.base.next;
-
-public class PlcReadRequest implements PlcRequest {
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransaction.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransaction.java
deleted file mode 100644
index c6b5d90..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransaction.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.plc4x.java.base.next;
-
-import java.util.concurrent.Future;
-
-public interface PlcTransaction {
-
-    PlcTransaction read(String alias, String fieldQuery);
-
-    <T> PlcTransaction write(String alias, String fieldQuery, T value);
-
-    Future<PlcTransactionResponse> execute();
-
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransportFactory.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransportFactory.java
new file mode 100644
index 0000000..a33b56c
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransportFactory.java
@@ -0,0 +1,88 @@
+package org.apache.plc4x.java.base.next;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.base.next.netty.PlcApiCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+
+/**
+ * This represents a Transport layer which is used to generate a Netty Socket.
+ */
+public abstract class PlcTransportFactory {
+
+    private static final Logger logger = LoggerFactory.getLogger(PlcTransportFactory.class);
+
+    private PlcApiCodec codec;
+
+    /** Its necessary to have the codec to add a Callback as Handler **/
+    public PlcApiCodec getCodec() {
+        return codec;
+    }
+
+    /**
+     *
+     * This Method creates a new Channel and is only used internally by
+     * {@link org.apache.plc4x.java.base.next.PlcDriver}s.
+     *
+     * @param protocol Plc Protocol to use on this Connection
+     * @param address  Socket Address to connect to
+     * @throws PlcConnectionException If no connection can be established
+     *
+     * <b>Important</b> This method is responsible for closing the {@link EventLoopGroup}s
+     * it creates!
+     */
+    ChannelFuture connect(PlcProtocol protocol, SocketAddress address) throws PlcConnectionException {
+        logger.debug("Trying to establish connection to {}", address);
+        try {
+            Bootstrap bootstrap = new Bootstrap();
+            final EventLoopGroup workerGroup = createEventLoopGroup();
+            bootstrap.group(workerGroup);
+            bootstrap.channel(getSocketClass());
+            // TODO we should use an explicit (configurable?) timeout here
+            // bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+
+            // Instanciate codec
+            this.codec = new PlcApiCodec(protocol);
+
+            bootstrap.handler(new ChannelInitializer() {
+                @Override protected void initChannel(Channel ch) throws Exception {
+                    ch.pipeline().addLast(codec);
+                }
+            });
+            // Start the client.
+            final ChannelFuture f = bootstrap.connect(address);
+            f.addListener(new GenericFutureListener<Future<? super Void>>() {
+                @Override public void operationComplete(Future<? super Void> future) throws Exception {
+                    if (!future.isSuccess()) {
+                        logger.info("Unable to connect, shutting down worker thread.");
+                        workerGroup.shutdownGracefully();
+                    }
+                }
+            });
+            return f;
+        } catch (Exception e) {
+            throw new PlcConnectionException("Error creating channel.", e);
+        }
+    }
+
+    /** Use NiO by default **/
+    protected EventLoopGroup createEventLoopGroup() {
+        return new NioEventLoopGroup();
+    }
+
+    protected abstract Class<? extends Channel> getSocketClass();
+
+    /** Can configure the bootstrap, if necessary **/
+    protected abstract void configure(Bootstrap bootstrap);
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcWriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcWriteRequest.java
deleted file mode 100644
index 0f525da..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcWriteRequest.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.plc4x.java.base.next;
-
-public class PlcWriteRequest implements PlcRequest {
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/RequestInformation.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/RequestInformation.java
new file mode 100644
index 0000000..08e7508
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/RequestInformation.java
@@ -0,0 +1,69 @@
+package org.apache.plc4x.java.base.next;
+
+import org.apache.plc4x.java.base.next.commands.Message;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This is a Container object which holds information for in-flight requests.
+ * It also tracks the lifecycle of each request.
+ */
+public class RequestInformation {
+
+    private final int transactionId;
+    private final CompletableFuture<PlcResponse> future;
+    private Status status;
+    private Message request;
+    private Message response;
+    private Throwable cause;
+
+    public RequestInformation(int transactionId) {
+        this.transactionId = transactionId;
+        future = new CompletableFuture<>();
+    }
+
+    public void setRequest(Message request) {
+        this.request = request;
+    }
+
+    public void setResponse(Message response) {
+        this.response = response;
+    }
+
+    public void setCause(Throwable cause) {
+        this.cause = cause;
+    }
+
+    public Message getRequest() {
+        return request;
+    }
+
+    public Message getResponse() {
+        return response;
+    }
+
+    public Throwable getCause() {
+        return cause;
+    }
+
+    public CompletableFuture<PlcResponse> getFuture() {
+        return future;
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+
+    public void setStatus(Status status) {
+        this.status = status;
+    }
+
+    enum Status {
+        PREPARING,
+        WAIT_TO_SEND,
+        SENT,
+        RESPONDED,
+        FINISHED,
+        FAILED
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/FailedRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/FailedRequest.java
new file mode 100644
index 0000000..00701f2
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/FailedRequest.java
@@ -0,0 +1,15 @@
+package org.apache.plc4x.java.base.next.commands;
+
+public class FailedRequest extends Response{
+
+    private final Throwable cause;
+
+    protected FailedRequest(int transactionId, Throwable cause) {
+        super(transactionId);
+        this.cause = cause;
+    }
+
+    public Throwable getCause() {
+        return cause;
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Message.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Message.java
new file mode 100644
index 0000000..ea65359
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Message.java
@@ -0,0 +1,14 @@
+package org.apache.plc4x.java.base.next.commands;
+
+public abstract class Message {
+
+    private final int transactionId;
+
+    protected Message(int transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public int getTransactionId() {
+        return transactionId;
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/ReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/ReadRequest.java
new file mode 100644
index 0000000..19898bd
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/ReadRequest.java
@@ -0,0 +1,19 @@
+package org.apache.plc4x.java.base.next.commands;
+
+public class ReadRequest extends Message {
+
+    private final String query;
+
+    public ReadRequest(int transactionId, String query) {
+        super(transactionId);
+        this.query = query;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    @Override public String toString() {
+        return super.toString();
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Response.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Response.java
new file mode 100644
index 0000000..0cdc7bc
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Response.java
@@ -0,0 +1,9 @@
+package org.apache.plc4x.java.base.next.commands;
+
+public class Response extends Message {
+
+    protected Response(int transactionId) {
+        super(transactionId);
+    }
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/WriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/WriteRequest.java
new file mode 100644
index 0000000..8a1ec76
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/WriteRequest.java
@@ -0,0 +1,8 @@
+package org.apache.plc4x.java.base.next.commands;
+
+public class WriteRequest extends Message {
+
+    protected WriteRequest(int transactionId) {
+        super(transactionId);
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java
new file mode 100644
index 0000000..81a4cef
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java
@@ -0,0 +1,94 @@
+package org.apache.plc4x.java.base.next.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.plc4x.java.base.next.PlcConnection;
+import org.apache.plc4x.java.base.next.PlcConnectionImpl;
+import org.apache.plc4x.java.base.next.PlcProtocol;
+import org.apache.plc4x.java.base.next.commands.ReadRequest;
+import org.apache.plc4x.java.base.next.commands.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the binding element between a {@link org.apache.plc4x.java.base.next.PlcProtocol},
+ * netty and the High Level API classes like @{@link org.apache.plc4x.java.base.next.PlcConnection}.
+ */
+public class PlcApiCodec extends ChannelDuplexHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(PlcApiCodec.class);
+
+    private final PlcProtocol protocol;
+
+    // We have to keep track of all "in flight" messages here
+    private PlcConnection plcConnection;
+
+    // TODO perhaps factory or class name?
+    // TODO Perhaps connection layer here?
+    public PlcApiCodec(PlcProtocol protocol) {
+        this.protocol = protocol;
+    }
+
+    @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        this.protocol.init();
+    }
+
+    @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+        this.protocol.close();
+    }
+
+    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        // ?
+    }
+
+    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        // ?
+    }
+
+    @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        // ?
+    }
+
+    @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        // ?
+    }
+
+    @Override public void write(ChannelHandlerContext ctx, Object command, ChannelPromise promise) throws Exception {
+        // Check exact message type and forward
+        if (command instanceof ReadRequest) {
+            logger.debug("Received read request {}", command);
+            final ByteBuf buffer = Unpooled.buffer();
+            protocol.encode(((ReadRequest) command), buffer);
+            ctx.writeAndFlush(buffer);
+        } else {
+            throw new IllegalStateException("The Command " + command.getClass() + " is not implemented!");
+        }
+    }
+
+    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        final ByteBuf buf = (ByteBuf) msg;
+        logger.debug("Received message\n"
+            + ByteBufUtil.prettyHexDump(buf));
+        buf.markReaderIndex();
+        try {
+            Response response = protocol.decode(buf);
+            this.plcConnection.respond(response);
+        } catch (PlcProtocol.UnableToParseException e) {
+            buf.resetReaderIndex();
+        }
+        ReferenceCountUtil.release(msg);
+    }
+
+    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        // General Exception handling!
+    }
+
+    public void registerConnection(PlcConnectionImpl plcConnection) {
+        this.plcConnection = plcConnection;
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/TcpTransportFactory.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/TcpTransportFactory.java
new file mode 100644
index 0000000..3dd5e1e
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/TcpTransportFactory.java
@@ -0,0 +1,20 @@
+package org.apache.plc4x.java.base.next.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.plc4x.java.base.next.PlcTransportFactory;
+
+public class TcpTransportFactory extends PlcTransportFactory {
+
+    @Override protected Class<? extends Channel> getSocketClass() {
+        return NioSocketChannel.class;
+    }
+
+    @Override protected void configure(Bootstrap bootstrap) {
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+        bootstrap.option(ChannelOption.TCP_NODELAY, true);
+    }
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/next/PlcDriverManagerImplTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/next/PlcDriverManagerImplTest.java
new file mode 100644
index 0000000..6ba5b84
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/next/PlcDriverManagerImplTest.java
@@ -0,0 +1,84 @@
+package org.apache.plc4x.java.base.next;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.base.next.commands.ReadRequest;
+import org.apache.plc4x.java.base.next.commands.Response;
+import org.apache.plc4x.java.base.next.netty.TcpTransportFactory;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class PlcDriverManagerImplTest {
+
+    @Test
+    public void connect() throws PlcConnectionException, ExecutionException, InterruptedException {
+        final PlcDriverManagerImpl driverManager = new PlcDriverManagerImpl(Collections.singletonList(new TestDriver()));
+        final Optional<PlcDriver> driver = driverManager.getDriver("Hallo");
+        assert driver.isPresent();
+        final Future<PlcConnection> connectFuture = driver.get().connect("Hallo");
+        final PlcConnection connection = connectFuture.get();
+        final Future<PlcResponse> readFuture = connection.read("asdf");
+        final PlcResponse response = readFuture.get();
+
+        System.out.println("Finished...");
+    }
+
+    private static class TestDriver extends PlcDriverImpl {
+
+        @Override protected ConnectionParserFactory parserFactory() {
+            return new TestParserFactory();
+        }
+
+        @Override protected PlcProtocol getProtocol() {
+            return new TestProtocol();
+        }
+
+        @Override public boolean validate(String fieldQuery) {
+            return true;
+        }
+
+        @Override public boolean accepts(String connection) {
+            return true;
+        }
+
+        private static class TestProtocol implements PlcProtocol {
+            @Override public void init() {
+
+            }
+
+            @Override public void close() {
+
+            }
+
+            @Override public void encode(ReadRequest command, ByteBuf out) {
+                out.writeBytes(command.getQuery().getBytes());
+            }
+
+            @Override public Response decode(ByteBuf buf) throws UnableToParseException {
+                throw new UnableToParseException();
+            }
+        }
+
+        private static class TestParserFactory implements ConnectionParserFactory {
+            @Override public ConnectionParser parse(String connection) throws PlcConnectionException {
+                return new TestParser();
+            }
+
+            private static class TestParser implements ConnectionParser {
+                @Override public PlcTransportFactory getTransport() {
+                    return new TcpTransportFactory();
+                }
+
+                @Override public SocketAddress getSocketAddress() {
+                    return new InetSocketAddress("localhost", 1234);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file