You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/21 15:19:30 UTC
[plc4x] 01/02: Big progress with the new API. Minimal WOrking
example... Works.
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch feature/new-api
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit fb758c38c5b070422eaa4211d233c0633a9e4fac
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Wed Aug 21 15:48:12 2019 +0200
Big progress with the new API. Minimal WOrking example... Works.
---
.../org/apache/plc4x/java/base/next/Example.java | 36 ++++-----
.../{TransactionLevel.java => IsolationLevel.java} | 2 +-
.../plc4x/java/base/next/PlcBatchRequest.java | 13 +++
.../apache/plc4x/java/base/next/PlcConnection.java | 12 ++-
.../plc4x/java/base/next/PlcConnectionImpl.java | 82 +++++++++++++++++++
.../org/apache/plc4x/java/base/next/PlcDriver.java | 26 +++++-
.../apache/plc4x/java/base/next/PlcDriverImpl.java | 42 ++++++++++
.../plc4x/java/base/next/PlcDriverManager.java | 3 +-
.../plc4x/java/base/next/PlcDriverManagerImpl.java | 48 +++++++++++
.../apache/plc4x/java/base/next/PlcExecutor.java | 10 +++
.../apache/plc4x/java/base/next/PlcProtocol.java | 27 +++++++
.../plc4x/java/base/next/PlcReadRequest.java | 4 -
.../plc4x/java/base/next/PlcTransaction.java | 13 ---
.../plc4x/java/base/next/PlcTransportFactory.java | 88 ++++++++++++++++++++
.../plc4x/java/base/next/PlcWriteRequest.java | 4 -
.../plc4x/java/base/next/RequestInformation.java | 69 ++++++++++++++++
.../java/base/next/commands/FailedRequest.java | 15 ++++
.../plc4x/java/base/next/commands/Message.java | 14 ++++
.../plc4x/java/base/next/commands/ReadRequest.java | 19 +++++
.../plc4x/java/base/next/commands/Response.java | 9 +++
.../java/base/next/commands/WriteRequest.java | 8 ++
.../plc4x/java/base/next/netty/PlcApiCodec.java | 94 ++++++++++++++++++++++
.../java/base/next/netty/TcpTransportFactory.java | 20 +++++
.../java/base/next/PlcDriverManagerImplTest.java | 84 +++++++++++++++++++
24 files changed, 697 insertions(+), 45 deletions(-)
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/Example.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/Example.java
index 062b60b..a7a5a36 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/Example.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/Example.java
@@ -5,23 +5,23 @@ import java.util.concurrent.ExecutionException;
public class Example {
public static void main(String[] args) throws ExecutionException, InterruptedException {
- PlcDriverManager driverManager = null;
-
- // Get a Connection
- PlcConnection conn = driverManager
- .connect("s7://xxxxxx")
- .get();
-
- // Get a Driver
- PlcDriver driver = driverManager
- .getDriver("s7://xxxxx");
-
- // Get Connection from Driver
- PlcConnection conn2 = driver.connect()
- .get();
-
- // Now do a request
-// conn.startTransaction(TransactionLevel.WEAK)
-// .read("field")
+// PlcDriverManager driverManager = null;
+//
+// // Get a Connection
+// PlcConnection conn = driverManager
+// .connect("s7://xxxxxx")
+// .get();
+//
+// // Get a Driver
+// PlcDriver driver = driverManager
+// .getDriver("s7://xxxxx");
+//
+// // Get Connection from Driver
+// PlcConnection conn2 = driver.connect()
+// .get();
+//
+// // Now do a request
+//// conn.startBatch(IsolationLevel.WEAK)
+//// .read("field")
}
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/TransactionLevel.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/IsolationLevel.java
similarity index 63%
rename from plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/TransactionLevel.java
rename to plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/IsolationLevel.java
index 9c5cb57..d3b3ee9 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/TransactionLevel.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/IsolationLevel.java
@@ -1,5 +1,5 @@
package org.apache.plc4x.java.base.next;
-public enum TransactionLevel {
+public enum IsolationLevel {
WEAK
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcBatchRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcBatchRequest.java
new file mode 100644
index 0000000..ce74230
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcBatchRequest.java
@@ -0,0 +1,13 @@
+package org.apache.plc4x.java.base.next;
+
+import java.util.concurrent.Future;
+
+public interface PlcBatchRequest {
+
+ PlcBatchRequest read(String alias, String fieldQuery);
+
+ <T> PlcBatchRequest write(String alias, String fieldQuery, T value);
+
+ Future<PlcTransactionResponse> execute();
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
index 26d5955..8863ad1 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnection.java
@@ -1,14 +1,20 @@
package org.apache.plc4x.java.base.next;
+import org.apache.plc4x.java.base.next.commands.Response;
+
import java.util.concurrent.Future;
public interface PlcConnection {
- Future<PlcResponse> execute(PlcRequest request);
-
Future<PlcResponse> read(String fieldQuery);
<T> Future<PlcResponse> write(String fieldQuery, T value);
- PlcTransaction startTransaction(TransactionLevel level);
+ PlcBatchRequest startBatch(IsolationLevel level);
+
+ /**
+ * Is called from downstream to notify the Connection object, that there is a response
+ * for one of its requests.
+ */
+ void respond(Response response);
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java
new file mode 100644
index 0000000..38e81af
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcConnectionImpl.java
@@ -0,0 +1,82 @@
+package org.apache.plc4x.java.base.next;
+
+import io.netty.channel.Channel;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.plc4x.java.base.next.commands.FailedRequest;
+import org.apache.plc4x.java.base.next.commands.ReadRequest;
+import org.apache.plc4x.java.base.next.commands.Response;
+import org.apache.plc4x.java.base.next.netty.PlcApiCodec;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Wrapper around nettys {@link Channel} which represents one Connection to a device.
+ * This implementation should be (mostly) generic and drivers do NOT have to subclass it.
+ */
+public class PlcConnectionImpl implements PlcConnection {
+
+ private final PlcDriver driver;
+ private final PlcApiCodec codec;
+ private final Channel channel;
+
+ // We have to keep track of all "in flight" messages here
+ private final Map<Integer, RequestInformation> requests;
+ private final AtomicInteger transactionCounter = new AtomicInteger(0);
+
+ public PlcConnectionImpl(PlcDriver driver, PlcApiCodec codec, Channel channel) {
+ this.driver = driver;
+ this.codec = codec;
+ this.channel = channel;
+ requests = new ConcurrentHashMap<>(); // Better to be thread safe here
+ // Register this object with the Codec
+ this.codec.registerConnection(this);
+ }
+
+ @Override public Future<PlcResponse> read(String fieldQuery) {
+ // TODO do we parse here or in the protocol??
+ // Prepare the container here
+ final int transactionId = transactionCounter.getAndIncrement();
+ final ReadRequest request = new ReadRequest(transactionId, fieldQuery);
+ // Create Request Information Object
+ final RequestInformation information = new RequestInformation(transactionId);
+ information.setRequest(request);
+ information.setStatus(RequestInformation.Status.PREPARING);
+ // Register Request
+ requests.put(transactionId, information);
+
+ channel.pipeline().write(request).addListener(future -> {
+ if (future.isSuccess()) {
+ information.setStatus(RequestInformation.Status.SENT);
+ } else {
+ // TODO make convencience method here?
+ information.setStatus(RequestInformation.Status.FAILED);
+ information.setCause(future.cause());
+ information.getFuture().completeExceptionally(future.cause());
+ }
+ });
+ return information.getFuture();
+ }
+
+ @Override public <T> Future<PlcResponse> write(String fieldQuery, T value) {
+ // TODO use common base with #read()
+ throw new NotImplementedException("");
+ }
+
+ @Override public PlcBatchRequest startBatch(IsolationLevel level) {
+ throw new NotImplementedException("");
+ }
+
+ @Override public void respond(Response response) {
+ // Take the necessary action
+ assert requests.containsKey(response.getTransactionId());
+ final RequestInformation information = requests.get(response.getTransactionId());
+ if (response instanceof FailedRequest) {
+ information.getFuture().completeExceptionally(((FailedRequest) response).getCause());
+ } else {
+ throw new IllegalStateException("This type of response " + response.getClass() + " is not implemented");
+ }
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriver.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriver.java
index b9eb0d2..c553bba 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriver.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriver.java
@@ -1,11 +1,35 @@
package org.apache.plc4x.java.base.next;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.base.next.netty.TcpTransportFactory;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.concurrent.Future;
+/**
+ * Question: Is this a shared instance (Singleton) or one for each "connection",
+ * then we need a factory.
+ */
public interface PlcDriver {
boolean validate(String fieldQuery);
- Future<PlcConnection> connect();
+ Future<PlcConnection> connect(String connection) throws PlcConnectionException;
+
+ boolean accepts(String connection);
+
+ interface ConnectionParserFactory {
+
+ ConnectionParser parse(String connection) throws PlcConnectionException;
+
+ }
+
+ interface ConnectionParser {
+
+ PlcTransportFactory getTransport();
+
+ SocketAddress getSocketAddress();
+ }
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverImpl.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverImpl.java
new file mode 100644
index 0000000..bc568d4
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverImpl.java
@@ -0,0 +1,42 @@
+package org.apache.plc4x.java.base.next;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+/**
+ * Base class for the Drivers.
+ * All that a new Protocol has to override is the Parsing for the Fields (TODO general)
+ * and a specific instance of a Protocol.
+ */
+public abstract class PlcDriverImpl implements PlcDriver {
+
+ protected abstract ConnectionParserFactory parserFactory();
+
+ protected abstract PlcProtocol getProtocol();
+
+ @Override public Future<PlcConnection> connect(String connection) throws PlcConnectionException {
+ // Parse connection
+ final ConnectionParser parser = parserFactory().parse(connection);
+ // Prepare the Completable Future
+ final CompletableFuture<PlcConnection> returnFuture = new CompletableFuture<>();
+ // Establish the connection with callback for Async
+ final PlcTransportFactory transportFactory = parser.getTransport();
+ final ChannelFuture channelFuture = transportFactory.connect(getProtocol(), parser.getSocketAddress());
+ channelFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
+ @Override public void operationComplete(io.netty.util.concurrent.Future<? super Void> f) throws Exception {
+ // If sucesssfull return the new Connection instance, if not, cancel
+ if (f.isSuccess()) {
+ returnFuture.complete(new PlcConnectionImpl(PlcDriverImpl.this, transportFactory.getCodec(), ((ChannelFuture) f).channel()));
+ } else {
+ returnFuture.completeExceptionally(f.cause());
+ }
+ }
+ });
+ return returnFuture;
+ }
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManager.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManager.java
index 2db4efe..7e3c7ad 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManager.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManager.java
@@ -1,11 +1,12 @@
package org.apache.plc4x.java.base.next;
+import java.util.Optional;
import java.util.concurrent.Future;
public interface PlcDriverManager {
Future<PlcConnection> connect(String connection);
- PlcDriver getDriver(String connection);
+ Optional<PlcDriver> getDriver(String connection);
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManagerImpl.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManagerImpl.java
new file mode 100644
index 0000000..52b5c2d
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcDriverManagerImpl.java
@@ -0,0 +1,48 @@
+package org.apache.plc4x.java.base.next;
+
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.Future;
+
+public class PlcDriverManagerImpl implements PlcDriverManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(PlcDriverManagerImpl.class);
+
+ private final Collection<PlcDriver> regisgtered;
+
+ public PlcDriverManagerImpl() {
+ // TODO Implement this
+ regisgtered = null;
+ }
+
+ /** For testing **/
+ PlcDriverManagerImpl(Collection<PlcDriver> regisgtered) {
+ this.regisgtered = regisgtered;
+ }
+
+ @Override public Future<PlcConnection> connect(String connection) {
+ return getDriver(connection)
+ .map(drv -> {
+ try {
+ return drv.connect(connection);
+ } catch (PlcConnectionException e) {
+ throw new PlcRuntimeException(e);
+ }
+ })
+ .orElseThrow(() -> new RuntimeException("No suitable driver found for connection ‘" + connection + "‘"));
+ }
+
+ @Override public Optional<PlcDriver> getDriver(String connection) {
+ for (PlcDriver driver : regisgtered) {
+ if (driver.accepts(connection)) {
+ return Optional.of(driver);
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcExecutor.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcExecutor.java
new file mode 100644
index 0000000..0b57d68
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcExecutor.java
@@ -0,0 +1,10 @@
+package org.apache.plc4x.java.base.next;
+
+/**
+ * This class is the link between the API classes like
+ * {@link PlcDriver}, @{@link PlcConnection} and so on, the Protocols and the Transport
+ * layer which is most likely always netty.
+ */
+public class PlcExecutor {
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java
new file mode 100644
index 0000000..a092753
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcProtocol.java
@@ -0,0 +1,27 @@
+package org.apache.plc4x.java.base.next;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.plc4x.java.base.next.commands.ReadRequest;
+import org.apache.plc4x.java.base.next.commands.Response;
+
+public interface PlcProtocol {
+
+ void init();
+
+ void close();
+
+ /**
+ * This method encodes a read request to bytes
+ */
+ void encode(ReadRequest command, ByteBuf out);
+
+ /**
+ * This method gets offered a buffer
+ * @throws UnableToParseException whenever there are not enough bytes to decode a response
+ */
+ Response decode(ByteBuf buf) throws UnableToParseException;
+
+ class UnableToParseException extends Exception {
+
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcReadRequest.java
deleted file mode 100644
index a488880..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcReadRequest.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.plc4x.java.base.next;
-
-public class PlcReadRequest implements PlcRequest {
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransaction.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransaction.java
deleted file mode 100644
index c6b5d90..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransaction.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.plc4x.java.base.next;
-
-import java.util.concurrent.Future;
-
-public interface PlcTransaction {
-
- PlcTransaction read(String alias, String fieldQuery);
-
- <T> PlcTransaction write(String alias, String fieldQuery, T value);
-
- Future<PlcTransactionResponse> execute();
-
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransportFactory.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransportFactory.java
new file mode 100644
index 0000000..a33b56c
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcTransportFactory.java
@@ -0,0 +1,88 @@
+package org.apache.plc4x.java.base.next;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.base.next.netty.PlcApiCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+
+/**
+ * This represents a Transport layer which is used to generate a Netty Socket.
+ */
+public abstract class PlcTransportFactory {
+
+ private static final Logger logger = LoggerFactory.getLogger(PlcTransportFactory.class);
+
+ private PlcApiCodec codec;
+
+ /** Its necessary to have the codec to add a Callback as Handler **/
+ public PlcApiCodec getCodec() {
+ return codec;
+ }
+
+ /**
+ *
+ * This Method creates a new Channel and is only used internally by
+ * {@link org.apache.plc4x.java.base.next.PlcDriver}s.
+ *
+ * @param protocol Plc Protocol to use on this Connection
+ * @param address Socket Address to connect to
+ * @throws PlcConnectionException If no connection can be established
+ *
+ * <b>Important</b> This method is responsible for closing the {@link EventLoopGroup}s
+ * it creates!
+ */
+ ChannelFuture connect(PlcProtocol protocol, SocketAddress address) throws PlcConnectionException {
+ logger.debug("Trying to establish connection to {}", address);
+ try {
+ Bootstrap bootstrap = new Bootstrap();
+ final EventLoopGroup workerGroup = createEventLoopGroup();
+ bootstrap.group(workerGroup);
+ bootstrap.channel(getSocketClass());
+ // TODO we should use an explicit (configurable?) timeout here
+ // bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+
+ // Instanciate codec
+ this.codec = new PlcApiCodec(protocol);
+
+ bootstrap.handler(new ChannelInitializer() {
+ @Override protected void initChannel(Channel ch) throws Exception {
+ ch.pipeline().addLast(codec);
+ }
+ });
+ // Start the client.
+ final ChannelFuture f = bootstrap.connect(address);
+ f.addListener(new GenericFutureListener<Future<? super Void>>() {
+ @Override public void operationComplete(Future<? super Void> future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.info("Unable to connect, shutting down worker thread.");
+ workerGroup.shutdownGracefully();
+ }
+ }
+ });
+ return f;
+ } catch (Exception e) {
+ throw new PlcConnectionException("Error creating channel.", e);
+ }
+ }
+
+ /** Use NiO by default **/
+ protected EventLoopGroup createEventLoopGroup() {
+ return new NioEventLoopGroup();
+ }
+
+ protected abstract Class<? extends Channel> getSocketClass();
+
+ /** Can configure the bootstrap, if necessary **/
+ protected abstract void configure(Bootstrap bootstrap);
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcWriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcWriteRequest.java
deleted file mode 100644
index 0f525da..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/PlcWriteRequest.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.plc4x.java.base.next;
-
-public class PlcWriteRequest implements PlcRequest {
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/RequestInformation.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/RequestInformation.java
new file mode 100644
index 0000000..08e7508
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/RequestInformation.java
@@ -0,0 +1,69 @@
+package org.apache.plc4x.java.base.next;
+
+import org.apache.plc4x.java.base.next.commands.Message;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This is a Container object which holds information for in-flight requests.
+ * It also tracks the lifecycle of each request.
+ */
+public class RequestInformation {
+
+ private final int transactionId;
+ private final CompletableFuture<PlcResponse> future;
+ private Status status;
+ private Message request;
+ private Message response;
+ private Throwable cause;
+
+ public RequestInformation(int transactionId) {
+ this.transactionId = transactionId;
+ future = new CompletableFuture<>();
+ }
+
+ public void setRequest(Message request) {
+ this.request = request;
+ }
+
+ public void setResponse(Message response) {
+ this.response = response;
+ }
+
+ public void setCause(Throwable cause) {
+ this.cause = cause;
+ }
+
+ public Message getRequest() {
+ return request;
+ }
+
+ public Message getResponse() {
+ return response;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+
+ public CompletableFuture<PlcResponse> getFuture() {
+ return future;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
+ enum Status {
+ PREPARING,
+ WAIT_TO_SEND,
+ SENT,
+ RESPONDED,
+ FINISHED,
+ FAILED
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/FailedRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/FailedRequest.java
new file mode 100644
index 0000000..00701f2
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/FailedRequest.java
@@ -0,0 +1,15 @@
+package org.apache.plc4x.java.base.next.commands;
+
+public class FailedRequest extends Response{
+
+ private final Throwable cause;
+
+ protected FailedRequest(int transactionId, Throwable cause) {
+ super(transactionId);
+ this.cause = cause;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Message.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Message.java
new file mode 100644
index 0000000..ea65359
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Message.java
@@ -0,0 +1,14 @@
+package org.apache.plc4x.java.base.next.commands;
+
+public abstract class Message {
+
+ private final int transactionId;
+
+ protected Message(int transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public int getTransactionId() {
+ return transactionId;
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/ReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/ReadRequest.java
new file mode 100644
index 0000000..19898bd
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/ReadRequest.java
@@ -0,0 +1,19 @@
+package org.apache.plc4x.java.base.next.commands;
+
+public class ReadRequest extends Message {
+
+ private final String query;
+
+ public ReadRequest(int transactionId, String query) {
+ super(transactionId);
+ this.query = query;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ @Override public String toString() {
+ return super.toString();
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Response.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Response.java
new file mode 100644
index 0000000..0cdc7bc
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/Response.java
@@ -0,0 +1,9 @@
+package org.apache.plc4x.java.base.next.commands;
+
+public class Response extends Message {
+
+ protected Response(int transactionId) {
+ super(transactionId);
+ }
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/WriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/WriteRequest.java
new file mode 100644
index 0000000..8a1ec76
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/commands/WriteRequest.java
@@ -0,0 +1,8 @@
+package org.apache.plc4x.java.base.next.commands;
+
+public class WriteRequest extends Message {
+
+ protected WriteRequest(int transactionId) {
+ super(transactionId);
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java
new file mode 100644
index 0000000..81a4cef
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/PlcApiCodec.java
@@ -0,0 +1,94 @@
+package org.apache.plc4x.java.base.next.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.plc4x.java.base.next.PlcConnection;
+import org.apache.plc4x.java.base.next.PlcConnectionImpl;
+import org.apache.plc4x.java.base.next.PlcProtocol;
+import org.apache.plc4x.java.base.next.commands.ReadRequest;
+import org.apache.plc4x.java.base.next.commands.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the binding element between a {@link org.apache.plc4x.java.base.next.PlcProtocol},
+ * netty and the High Level API classes like @{@link org.apache.plc4x.java.base.next.PlcConnection}.
+ */
+public class PlcApiCodec extends ChannelDuplexHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(PlcApiCodec.class);
+
+ private final PlcProtocol protocol;
+
+ // We have to keep track of all "in flight" messages here
+ private PlcConnection plcConnection;
+
+ // TODO perhaps factory or class name?
+ // TODO Perhaps connection layer here?
+ public PlcApiCodec(PlcProtocol protocol) {
+ this.protocol = protocol;
+ }
+
+ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ this.protocol.init();
+ }
+
+ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ this.protocol.close();
+ }
+
+ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ // ?
+ }
+
+ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // ?
+ }
+
+ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ // ?
+ }
+
+ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ // ?
+ }
+
+ @Override public void write(ChannelHandlerContext ctx, Object command, ChannelPromise promise) throws Exception {
+ // Check exact message type and forward
+ if (command instanceof ReadRequest) {
+ logger.debug("Received read request {}", command);
+ final ByteBuf buffer = Unpooled.buffer();
+ protocol.encode(((ReadRequest) command), buffer);
+ ctx.writeAndFlush(buffer);
+ } else {
+ throw new IllegalStateException("The Command " + command.getClass() + " is not implemented!");
+ }
+ }
+
+ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ final ByteBuf buf = (ByteBuf) msg;
+ logger.debug("Received message\n"
+ + ByteBufUtil.prettyHexDump(buf));
+ buf.markReaderIndex();
+ try {
+ Response response = protocol.decode(buf);
+ this.plcConnection.respond(response);
+ } catch (PlcProtocol.UnableToParseException e) {
+ buf.resetReaderIndex();
+ }
+ ReferenceCountUtil.release(msg);
+ }
+
+ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ // General Exception handling!
+ }
+
+ public void registerConnection(PlcConnectionImpl plcConnection) {
+ this.plcConnection = plcConnection;
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/TcpTransportFactory.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/TcpTransportFactory.java
new file mode 100644
index 0000000..3dd5e1e
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/next/netty/TcpTransportFactory.java
@@ -0,0 +1,20 @@
+package org.apache.plc4x.java.base.next.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.plc4x.java.base.next.PlcTransportFactory;
+
+public class TcpTransportFactory extends PlcTransportFactory {
+
+ @Override protected Class<? extends Channel> getSocketClass() {
+ return NioSocketChannel.class;
+ }
+
+ @Override protected void configure(Bootstrap bootstrap) {
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
+ }
+
+}
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/next/PlcDriverManagerImplTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/next/PlcDriverManagerImplTest.java
new file mode 100644
index 0000000..6ba5b84
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/next/PlcDriverManagerImplTest.java
@@ -0,0 +1,84 @@
+package org.apache.plc4x.java.base.next;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.base.next.commands.ReadRequest;
+import org.apache.plc4x.java.base.next.commands.Response;
+import org.apache.plc4x.java.base.next.netty.TcpTransportFactory;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class PlcDriverManagerImplTest {
+
+ @Test
+ public void connect() throws PlcConnectionException, ExecutionException, InterruptedException {
+ final PlcDriverManagerImpl driverManager = new PlcDriverManagerImpl(Collections.singletonList(new TestDriver()));
+ final Optional<PlcDriver> driver = driverManager.getDriver("Hallo");
+ assert driver.isPresent();
+ final Future<PlcConnection> connectFuture = driver.get().connect("Hallo");
+ final PlcConnection connection = connectFuture.get();
+ final Future<PlcResponse> readFuture = connection.read("asdf");
+ final PlcResponse response = readFuture.get();
+
+ System.out.println("Finished...");
+ }
+
+ private static class TestDriver extends PlcDriverImpl {
+
+ @Override protected ConnectionParserFactory parserFactory() {
+ return new TestParserFactory();
+ }
+
+ @Override protected PlcProtocol getProtocol() {
+ return new TestProtocol();
+ }
+
+ @Override public boolean validate(String fieldQuery) {
+ return true;
+ }
+
+ @Override public boolean accepts(String connection) {
+ return true;
+ }
+
+ private static class TestProtocol implements PlcProtocol {
+ @Override public void init() {
+
+ }
+
+ @Override public void close() {
+
+ }
+
+ @Override public void encode(ReadRequest command, ByteBuf out) {
+ out.writeBytes(command.getQuery().getBytes());
+ }
+
+ @Override public Response decode(ByteBuf buf) throws UnableToParseException {
+ throw new UnableToParseException();
+ }
+ }
+
+ private static class TestParserFactory implements ConnectionParserFactory {
+ @Override public ConnectionParser parse(String connection) throws PlcConnectionException {
+ return new TestParser();
+ }
+
+ private static class TestParser implements ConnectionParser {
+ @Override public PlcTransportFactory getTransport() {
+ return new TcpTransportFactory();
+ }
+
+ @Override public SocketAddress getSocketAddress() {
+ return new InetSocketAddress("localhost", 1234);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file