You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@plc4x.apache.org by GitBox <gi...@apache.org> on 2018/10/18 09:31:43 UTC

[GitHub] asfgit closed pull request #27: API Refactoring: add execute operation to requests, extract SPI package

asfgit closed pull request #27: API Refactoring: add execute operation to requests, extract SPI package
URL: https://github.com/apache/incubator-plc4x/pull/27
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java b/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
index 728e5024b..a9994be1d 100644
--- a/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
+++ b/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
@@ -22,8 +22,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
 import com.microsoft.azure.sdk.iot.device.Message;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.slf4j.Logger;
@@ -62,15 +61,13 @@ public static void main(String[] args) throws Exception {
             DeviceClient client = new DeviceClient(iotConnectionString, IotHubClientProtocol.MQTT);
             client.open();
 
-            // Get a reader instance.
-            PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalStateException::new);
 
             // Prepare a read request.
-            PlcReadRequest request = plcReader.readRequestBuilder().addItem(FIELD_NAME, addressString).build();
+            PlcReadRequest request = plcConnection.readRequestBuilder().get().addItem(FIELD_NAME, addressString).build();
 
             while (!Thread.currentThread().isInterrupted()) {
                 // Simulate telemetry.
-                PlcReadResponse<?> response = plcReader.read(request).get();
+                PlcReadResponse response = request.execute().get();
                 response.getAllLongs(FIELD_NAME)
                     .forEach(longValue -> {
                             String result = Long.toBinaryString(longValue);
diff --git a/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/DummyDriver.java b/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/DummyDriver.java
index c8ab21a1a..d9e182b63 100644
--- a/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/DummyDriver.java
+++ b/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/DummyDriver.java
@@ -18,9 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.examples.dummydriver;
 
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.examples.dummydriver.connection.DummyConnection;
 
diff --git a/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java b/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java
index c448873d5..c5721caa9 100644
--- a/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java
+++ b/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java
@@ -21,12 +21,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.base.messages.PlcReader;
+import org.apache.plc4x.java.base.messages.PlcWriter;
+import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
 import org.apache.plc4x.java.base.messages.*;
@@ -34,6 +31,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class DummyConnection extends AbstractPlcConnection implements PlcReader, PlcWriter {
@@ -58,13 +56,29 @@ protected void initChannel(Channel channel) {
     }
 
     @Override
-    public PlcReadRequest.Builder readRequestBuilder() {
+    public Optional<PlcReadRequest.Builder> readRequestBuilder() {
         // TODO: Implement this ...
-        return null;
+        return Optional.empty();
     }
 
     @Override
-    public CompletableFuture<PlcReadResponse<?>> read(PlcReadRequest readRequest) {
+    public Optional<PlcWriteRequest.Builder> writeRequestBuilder() {
+        // TODO: Implement this ...
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         CompletableFuture<InternalPlcReadResponse> readFuture = new CompletableFuture<>();
         PlcRequestContainer<InternalPlcReadRequest, InternalPlcReadResponse> container =
             new PlcRequestContainer<>((InternalPlcReadRequest) readRequest, readFuture);
@@ -74,13 +88,7 @@ protected void initChannel(Channel channel) {
     }
 
     @Override
-    public PlcWriteRequest.Builder writeRequestBuilder() {
-        // TODO: Implement this ...
-        return null;
-    }
-
-    @Override
-    public CompletableFuture<PlcWriteResponse<?>> write(PlcWriteRequest writeRequest) {
+    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         CompletableFuture<InternalPlcWriteResponse> writeFuture = new CompletableFuture<>();
         PlcRequestContainer<InternalPlcWriteRequest, InternalPlcWriteResponse> container =
             new PlcRequestContainer<>((InternalPlcWriteRequest) writeRequest, writeFuture);
diff --git a/examples/dummy-driver/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/examples/dummy-driver/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
similarity index 100%
rename from examples/dummy-driver/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
rename to examples/dummy-driver/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
diff --git a/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java b/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java
index 48b2d6a3f..0bb8b0ebb 100644
--- a/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java
+++ b/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java
@@ -22,8 +22,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.SignatureAlgorithm;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.eclipse.paho.client.mqttv3.*;
@@ -233,13 +232,11 @@ public static void main(String[] args) throws Exception {
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection("s7://10.10.64.20/1/1")) {
             logger.info("Connected");
 
-            PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalAccessError::new);
-
-            PlcReadRequest readRequest = plcReader.readRequestBuilder().addItem("outputs", "OUTPUTS/0").build();
+            PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("outputs", "OUTPUTS/0").build();
 
             while (!Thread.currentThread().isInterrupted()) {
 
-                PlcReadResponse<?> plcReadResponse = plcReader.read(readRequest).get();
+                PlcReadResponse plcReadResponse = readRequest.execute().get();
 
                 // Refresh the connection credentials before the JWT expires.
                 // [START iot_mqtt_jwt_refresh]
diff --git a/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java b/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
index e0784349f..f8928dcd3 100644
--- a/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
+++ b/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
@@ -19,15 +19,13 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.examples.helloplc4x;
 
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class HelloPlc4x {
@@ -49,34 +47,36 @@ public static void main(String[] args) {
         // Establish a connection to the plc using the url provided as first argument
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection(args[0])) {
 
-            Optional<PlcReader> reader = plcConnection.getReader();
-
             // Check if this connection support reading of data.
-            if (reader.isPresent()) {
-                PlcReader plcReader = reader.get();
+            if (plcConnection.readRequestBuilder().isPresent()) {
 
                 // Create a new read request:
                 // - Give the single item requested the alias name "value"
-                PlcReadRequest.Builder builder = plcReader.readRequestBuilder();
+                PlcReadRequest.Builder syncBuilder = plcConnection.readRequestBuilder().get();
                 for (int i = 1; i < args.length; i++) {
-                    builder.addItem("value-" + i, args[i]);
+                    syncBuilder.addItem("value-" + i, args[i]);
                 }
-                PlcReadRequest plcReadRequest = builder.build();
+                PlcReadRequest syncPlcReadRequest = syncBuilder.build();
 
                 //////////////////////////////////////////////////////////
                 // Read synchronously ...
                 // NOTICE: the ".get()" immediately lets this thread pause till
                 // the response is processed and available.
                 System.out.println("\nSynchronous request ...");
-                PlcReadResponse<?> syncResponse = plcReader.read(plcReadRequest).get();
+                PlcReadResponse syncResponse = syncPlcReadRequest.execute().get();
                 // Simply iterating over the field names returned in the response.
                 printResponse(syncResponse);
 
                 //////////////////////////////////////////////////////////
                 // Read asynchronously ...
                 // Register a callback executed as soon as a response arives.
+                PlcReadRequest.Builder asyncBuilder = plcConnection.readRequestBuilder().get();
+                for (int i = 1; i < args.length; i++) {
+                    asyncBuilder.addItem("value-" + i, args[i]);
+                }
+                PlcReadRequest asyncPlcReadRequest = asyncBuilder.build();
                 System.out.println("\n\nAsynchronous request ...");
-                CompletableFuture<PlcReadResponse<?>> asyncResponse = plcReader.read(plcReadRequest);
+                CompletableFuture<? extends PlcReadResponse> asyncResponse = asyncPlcReadRequest.execute();
                 asyncResponse.whenComplete((readResponse, throwable) -> {
                     if (readResponse != null) {
                         printResponse(syncResponse);
@@ -94,7 +94,7 @@ public static void main(String[] args) {
         }
     }
 
-    private static void printResponse(PlcReadResponse<?> syncResponse) {
+    private static void printResponse(PlcReadResponse syncResponse) {
         for (String fieldName : syncResponse.getFieldNames()) {
             if(syncResponse.getResponseCode(fieldName) == PlcResponseCode.OK) {
                 int numValues = syncResponse.getNumberOfValues(fieldName);
diff --git a/examples/kafka-bridge/src/main/java/org/apache/plc4x/java/examples/kafkabridge/KafkaBridge.java b/examples/kafka-bridge/src/main/java/org/apache/plc4x/java/examples/kafkabridge/KafkaBridge.java
index 730fad478..b822aac56 100644
--- a/examples/kafka-bridge/src/main/java/org/apache/plc4x/java/examples/kafkabridge/KafkaBridge.java
+++ b/examples/kafka-bridge/src/main/java/org/apache/plc4x/java/examples/kafkabridge/KafkaBridge.java
@@ -86,10 +86,10 @@ private void run() throws PlcException {
         PlcReadRequest readRequest = builder.build();
 
         // Create a supplier that is able to read the batch we just created.
-        Supplier<PlcReadResponse<?>> plcSupplier = PlcFunctions.batchSupplier(plcAdapter, readRequest);
+        Supplier<PlcReadResponse> plcSupplier = PlcFunctions.batchSupplier(plcAdapter, readRequest);
 
         // Start polling our plc source in the given interval.
-        TStream<PlcReadResponse<?>> source = top.poll(plcSupplier, config.getPollingInterval(), TimeUnit.MILLISECONDS);
+        TStream<PlcReadResponse> source = top.poll(plcSupplier, config.getPollingInterval(), TimeUnit.MILLISECONDS);
 
         // Convert the byte into a string.
         TStream<String> jsonSource = source.map(value -> {
diff --git a/examples/plclogger/src/main/java/org/apache/plc4x/java/examples/plclogger/PlcLogger.java b/examples/plclogger/src/main/java/org/apache/plc4x/java/examples/plclogger/PlcLogger.java
index ffc14ec37..bd2a0660b 100644
--- a/examples/plclogger/src/main/java/org/apache/plc4x/java/examples/plclogger/PlcLogger.java
+++ b/examples/plclogger/src/main/java/org/apache/plc4x/java/examples/plclogger/PlcLogger.java
@@ -18,8 +18,6 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.examples.plclogger;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.edgent.function.Supplier;
 import org.apache.edgent.providers.direct.DirectProvider;
 import org.apache.edgent.topology.TStream;
@@ -27,6 +25,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.edgent.PlcConnectionAdapter;
 import org.apache.plc4x.edgent.PlcFunctions;
 
+import java.util.concurrent.TimeUnit;
+
 public class PlcLogger {
 
     public static void main(String[] args) throws Exception {
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 42544f593..23dd0eb9d 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -23,13 +23,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcException;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
-import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +48,6 @@ Licensed to the Apache Software Foundation (ASF) under one
     private Class<?> dataType;
     private PlcSubscriptionResponse subscriptionResponse;
 
-
     public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
         this.endpoint = endpoint;
         this.dataType = endpoint.getDataType();
@@ -83,19 +78,19 @@ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
 
     @Override
     protected void doStart() throws InterruptedException, ExecutionException, PlcException {
-        PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(
-            () -> new PlcException("Connection doesn't support subscriptions."));
         // TODO: Is it correct to only support one field?
-        PlcSubscriptionRequest request = plcSubscriber.subscriptionRequestBuilder()
+        PlcSubscriptionRequest request = plcConnection.subscriptionRequestBuilder().get()
             .addCyclicField("default", fieldQuery, Duration.of(3, ChronoUnit.SECONDS)).build();
-        plcSubscriber.register(request, this);
+        subscriptionResponse = request.execute().get();
+        // TODO: we need to return the plcSubscriptionResponse here too as we need this to unsubscribe...
+        // TODO: figure out what to do with this
+        // plcSubscriber.register(this, plcSubscriptionResponse.getSubscriptionHandles());
     }
 
     @Override
     protected void doStop() throws InterruptedException, ExecutionException, TimeoutException, PlcException {
-        PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(
-            () -> new PlcException("Connection doesn't support subscriptions."));
-        CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture = plcSubscriber.unsubscribe(builder -> builder.addHandles(subscriptionResponse.getSubscriptionHandles()));
+        PlcUnsubscriptionRequest request = plcConnection.unsubscriptionRequestBuilder().get().addHandles(subscriptionResponse.getSubscriptionHandles()).build();
+        CompletableFuture<? extends PlcUnsubscriptionResponse> unsubscriptionFuture = request.execute();
         PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
         // TODO: Handle the response ...
         try {
@@ -105,10 +100,6 @@ protected void doStop() throws InterruptedException, ExecutionException, Timeout
         }
     }
 
-    private PlcSubscriber getSubscriber() {
-        return plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("No subscriber available"));
-    }
-
     @Override
     public void accept(PlcSubscriptionEvent plcSubscriptionEvent) {
         LOGGER.debug("Received {}", plcSubscriptionEvent);
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
index 395398e27..ef2af6edc 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
@@ -24,8 +24,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.ServiceSupport;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -44,8 +43,7 @@ Licensed to the Apache Software Foundation (ASF) under one
     private Plc4XEndpoint endpoint;
     private ExceptionHandler exceptionHandler;
     private PlcConnection plcConnection;
-    private PlcReader plcReader;
-    private PlcReadRequest readRequest;
+    private PlcReadRequest.Builder requestBuilder;
     private Class dataType;
 
     public Plc4XPollingConsumer(Plc4XEndpoint endpoint) throws PlcException {
@@ -54,8 +52,7 @@ public Plc4XPollingConsumer(Plc4XEndpoint endpoint) throws PlcException {
         this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
         String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
         this.plcConnection = endpoint.getPlcDriverManager().getConnection(plc4xURI);
-        this.plcReader = plcConnection.getReader().orElseThrow(() -> new PlcException("This connection doesn't support reading."));
-        readRequest = plcReader.readRequestBuilder().addItem("default", endpoint.getAddress()).build();
+        this.requestBuilder = plcConnection.readRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support reading."));
     }
 
     @Override
@@ -79,7 +76,7 @@ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
     @Override
     public Exchange receive() {
         Exchange exchange = endpoint.createExchange();
-        CompletableFuture<? extends PlcReadResponse> read = plcReader.read(readRequest);
+        CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
         try {
             PlcReadResponse plcReadResponse = read.get();
             exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default")));
@@ -97,7 +94,7 @@ public Exchange receiveNoWait() {
     @Override
     public Exchange receive(long timeout) {
         Exchange exchange = endpoint.createExchange();
-        CompletableFuture<? extends PlcReadResponse> read = plcReader.read(readRequest);
+        CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
         try {
             PlcReadResponse plcReadResponse = read.get(timeout, TimeUnit.MILLISECONDS);
             exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default")));
@@ -121,6 +118,10 @@ protected void doStop() {
         }
     }
 
+    private PlcReadRequest createReadRequest() {
+        return requestBuilder.addItem("default", endpoint.getAddress()).build();
+    }
+
     private Object unwrapIfSingle(Collection collection) {
         if (collection.isEmpty()) {
             return null;
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
index 01acff05a..b737a638d 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
@@ -22,8 +22,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
@@ -34,14 +33,15 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 public class Plc4XProducer extends DefaultAsyncProducer {
     private PlcConnection plcConnection;
-    private PlcWriter plcWriter;
     private AtomicInteger openRequests;
 
     public Plc4XProducer(Plc4XEndpoint endpoint) throws PlcException {
         super(endpoint);
         String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
         plcConnection = endpoint.getPlcDriverManager().getConnection(plc4xURI);
-        plcWriter = plcConnection.getWriter().orElseThrow(() -> new PlcException("This connection doesn't support writing."));
+        if (!plcConnection.writeRequestBuilder().isPresent()) {
+            throw new PlcException("This connection (" + plc4xURI + ") doesn't support writing.");
+        }
         openRequests = new AtomicInteger();
     }
 
@@ -51,7 +51,6 @@ public void process(Exchange exchange) throws Exception {
         String fieldName = in.getHeader(Constants.FIELD_NAME_HEADER, String.class);
         String fieldQuery = in.getHeader(Constants.FIELD_QUERY_HEADER, String.class);
         Object body = in.getBody();
-        PlcWriteRequest.Builder builder = plcWriter.writeRequestBuilder();
         if (body instanceof List) {
             List<?> bodyList = in.getBody(List.class);
             Object[] values = bodyList.toArray();
@@ -60,8 +59,8 @@ public void process(Exchange exchange) throws Exception {
             Object value = in.getBody(Object.class);
 //            builder.addItem(fieldName, fieldQuery, value);
         }
-        PlcWriter plcWriter = plcConnection.getWriter().orElseThrow(() -> new IllegalArgumentException("Writer for driver not found"));
-        CompletableFuture<? extends PlcWriteResponse> completableFuture = plcWriter.write(builder.build());
+        PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder().orElseThrow(() -> new IllegalArgumentException("Writer for driver not found"));
+        CompletableFuture<? extends PlcWriteResponse> completableFuture = builder.build().execute();
         int currentlyOpenRequests = openRequests.incrementAndGet();
         try {
             log.debug("Currently open requests including {}:{}", exchange, currentlyOpenRequests);
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
index fb99abe05..a34f93d5f 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
@@ -18,13 +18,11 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.camel;
 
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.connection.PlcWriter;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.base.messages.PlcSubscriber;
+import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.base.messages.DefaultPlcSubscriptionResponse;
 import org.apache.plc4x.java.base.messages.InternalPlcSubscriptionRequest;
 import org.slf4j.Logger;
@@ -58,7 +56,10 @@ public String getProtocolName() {
     public PlcConnection connect(String url) {
         // Mock a connection.
         PlcConnection plcConnectionMock = mock(PlcConnection.class, RETURNS_DEEP_STUBS);
-        when(plcConnectionMock.getWriter()).thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS)));
+        when(plcConnectionMock.readRequestBuilder()).thenReturn(Optional.of(mock(PlcReadRequest.Builder.class, RETURNS_DEEP_STUBS)));
+        when(plcConnectionMock.writeRequestBuilder()).thenReturn(Optional.of(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS)));
+        when(plcConnectionMock.subscriptionRequestBuilder()).thenReturn(Optional.of(mock(PlcSubscriptionRequest.Builder.class, RETURNS_DEEP_STUBS)));
+        when(plcConnectionMock.unsubscriptionRequestBuilder()).thenReturn(Optional.of(mock(PlcUnsubscriptionRequest.Builder.class, RETURNS_DEEP_STUBS)));
 
         // Mock a typical subscriber.
         PlcSubscriber plcSubscriber = mock(PlcSubscriber.class, RETURNS_DEEP_STUBS);
@@ -90,7 +91,6 @@ public PlcConnection connect(String url) {
             responseFuture.complete(response);
             return responseFuture;
         });
-        when(plcConnectionMock.getSubscriber()).thenReturn(Optional.of(plcSubscriber));
         return plcConnectionMock;
     }
 
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XEndpointTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XEndpointTest.java
index d29b14814..b849e2ca3 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XEndpointTest.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XEndpointTest.java
@@ -37,6 +37,7 @@ public void setUp() {
         SUT = new Plc4XEndpoint("plc4x:mock:10.10.10.1/1/1", mock(Component.class));
     }
 
+    // TODO: figure out what this is
     @Test
     public void createProducer() throws Exception {
         assertThat(SUT.createProducer(), notNullValue());
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
index f35ce3d0b..f51a43de1 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
@@ -21,8 +21,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -45,8 +45,10 @@ public void setUp() throws Exception {
         Plc4XEndpoint endpointMock = mock(Plc4XEndpoint.class, RETURNS_DEEP_STUBS);
         when(endpointMock.getEndpointUri()).thenReturn("plc4x:mock:10.10.10.1/1/1");
         PlcDriverManager plcDriverManagerMock = mock(PlcDriverManager.class, RETURNS_DEEP_STUBS);
-        when(plcDriverManagerMock.getConnection(anyString()).getWriter())
-            .thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS)));
+
+        when(plcDriverManagerMock.getConnection(anyString()).writeRequestBuilder())
+            .thenReturn(Optional.of(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS)));
+
         when(endpointMock.getPlcDriverManager()).thenReturn(plcDriverManagerMock);
         SUT = new Plc4XProducer(endpointMock);
         testExchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
diff --git a/integrations/apache-camel/src/test/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/integrations/apache-camel/src/test/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
similarity index 100%
rename from integrations/apache-camel/src/test/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
rename to integrations/apache-camel/src/test/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
diff --git a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
index 2a345605b..a52cd3442 100644
--- a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
+++ b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
@@ -23,9 +23,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.edgent.function.Function;
 import org.apache.edgent.function.Supplier;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -104,12 +102,12 @@ public void close() throws Exception {
     }
 
     public PlcReadRequest.Builder readRequestBuilder() throws PlcException {
-        return getConnection().getReader().orElseThrow(
-            () -> new PlcException("This connection doesn't support reading")).readRequestBuilder();
+        return getConnection().readRequestBuilder().orElseThrow(
+            () -> new PlcException("This connection doesn't support reading"));
     }
 
-    Supplier<PlcReadResponse<?>> newSupplier(PlcReadRequest readRequest) {
-        return new Supplier<PlcReadResponse<?>>() {
+    Supplier<PlcReadResponse> newSupplier(PlcReadRequest readRequest) {
+        return new Supplier<PlcReadResponse>() {
             private static final long serialVersionUID = 1L;
 
             @Override
@@ -117,9 +115,7 @@ public PlcReadResponse get() {
                 PlcConnection connection = null;
                 try {
                     connection = getConnection();
-                    PlcReader reader = connection.getReader()
-                        .orElseThrow(() -> new PlcException("This connection doesn't support reading"));
-                    return reader.read(readRequest).get();
+                    return readRequest.execute().get();
                 } catch (Exception e) {
                     logger.error("reading from plc device {} {} failed", connection, readRequest, e);
                     return null;
@@ -154,10 +150,8 @@ public T get() {
             PlcField field = null;
             try {
                 connection = getConnection();
-                PlcReader reader = connection.getReader()
-                    .orElseThrow(() -> new PlcException("This connection doesn't support reading"));
-                PlcReadRequest readRequest = reader.readRequestBuilder().addItem(FIELD_NAME, fieldQuery).build();
-                PlcReadResponse readResponse = reader.read(readRequest).get();
+                PlcReadRequest readRequest = connection.readRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support reading")).addItem(FIELD_NAME, fieldQuery).build();
+                PlcReadResponse readResponse = readRequest.execute().get();
                 Object value = null;
                 switch (clientDatatype) {
                     case BYTE:
@@ -219,12 +213,10 @@ protected void write(PlcClientDatatype clientDatatype, String fieldQuery, Object
             PlcConnection connection = null;
             try {
                 connection = getConnection();
-                PlcWriter writer = connection.getWriter()
-                    .orElseThrow(() -> new PlcException("This connection doesn't support writing"));
-                PlcWriteRequest.Builder builder = writer.writeRequestBuilder();
+                PlcWriteRequest.Builder builder = connection.writeRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support writing"));
                 PlcWriteRequest writeRequest = builder.build();
                 addItem(builder, clientDatatype, fieldQuery, fieldValue);
-                writer.write(writeRequest).get();
+                writeRequest.execute().get();
             } catch (Exception e) {
                 logger.error("writing to plc device {} {} failed", connection, fieldQuery, e);
             }
diff --git a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcFunctions.java b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcFunctions.java
index 942dbc3ca..ab00c82de 100644
--- a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcFunctions.java
+++ b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcFunctions.java
@@ -144,7 +144,7 @@ private PlcFunctions() {
         return adapter.newSupplier(LocalDateTime.class, PlcClientDatatype.DATE_TIME, addressStr);
     }
 
-    public static Supplier<PlcReadResponse<?>> batchSupplier(PlcConnectionAdapter adapter, PlcReadRequest readRequest) {
+    public static Supplier<PlcReadResponse> batchSupplier(PlcConnectionAdapter adapter, PlcReadRequest readRequest) {
         return adapter.newSupplier(readRequest);
     }
 
diff --git a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcConnectionAdapterTest.java b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcConnectionAdapterTest.java
index bb9c2affa..06ac34eb0 100644
--- a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcConnectionAdapterTest.java
+++ b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcConnectionAdapterTest.java
@@ -90,7 +90,7 @@ public void testCtor2() throws Exception {
         // TODO: smart value conversion
         connection.setFieldItem(plcField, new DefaultLongFieldItem(0L));
 
-        CompletableFuture<PlcReadResponse<?>> cf = connection.read(request);
+        CompletableFuture<PlcReadResponse> cf = connection.read(request);
 
         assertThat(cf.isDone(), is(true));
         PlcReadResponse response = cf.get();
@@ -105,7 +105,7 @@ public void testCtor2() throws Exception {
         PlcField plcField = request.getFields().get(0);
         connection.setFieldItem(plcField, new DefaultLongFieldItem(0L));
 
-        CompletableFuture<PlcWriteResponse<?>> cf = connection.write(request);
+        CompletableFuture<PlcWriteResponse> cf = connection.write(request);
 
         assertThat(cf.isDone(), is(true));
         PlcWriteResponse response = cf.get();
diff --git a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockConnection.java b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockConnection.java
index 54754cd5d..383d3c6f8 100644
--- a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockConnection.java
+++ b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockConnection.java
@@ -21,8 +21,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.base.messages.PlcReader;
+import org.apache.plc4x.java.base.messages.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcIoException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -36,6 +36,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class MockConnection extends org.apache.plc4x.java.base.connection.MockConnection implements PlcReader, PlcWriter {
@@ -67,16 +68,16 @@ public String getUrl() {
     }
 
     @Override
-    public PlcReadRequest.Builder readRequestBuilder() {
-        return new DefaultPlcReadRequest.Builder(new MockFieldHandler());
+    public Optional<PlcReadRequest.Builder> readRequestBuilder() {
+        return Optional.of(new DefaultPlcReadRequest.Builder(this, new MockFieldHandler()));
     }
 
     @Override
-    public CompletableFuture<PlcReadResponse<?>> read(PlcReadRequest readRequest) {
+    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         curReadCnt++;
         if (readExceptionTriggerCount > 0 && curReadCnt == readExceptionTriggerCount) {
             curReadCnt = 0;
-            CompletableFuture<PlcReadResponse<?>> cf = new CompletableFuture<>();
+            CompletableFuture<PlcReadResponse> cf = new CompletableFuture<>();
             cf.completeExceptionally(new PlcIoException(readExceptionMsg));
             return cf;
         }
@@ -90,18 +91,18 @@ public String getUrl() {
     }
 
     @Override
-    public PlcWriteRequest.Builder writeRequestBuilder() {
-        return new DefaultPlcWriteRequest.Builder(new MockFieldHandler());
+    public Optional<PlcWriteRequest.Builder> writeRequestBuilder() {
+        return Optional.of(new DefaultPlcWriteRequest.Builder(this, new MockFieldHandler()));
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public CompletableFuture<PlcWriteResponse<?>> write(PlcWriteRequest writeRequest) {
+    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         DefaultPlcWriteRequest defaultPlcWriteRequest = (DefaultPlcWriteRequest) writeRequest;
         curWriteCnt++;
         if (writeExceptionTriggerCount > 0 && curWriteCnt == writeExceptionTriggerCount) {
             curWriteCnt = 0;
-            CompletableFuture<PlcWriteResponse<?>> cf = new CompletableFuture<>();
+            CompletableFuture<PlcWriteResponse> cf = new CompletableFuture<>();
             cf.completeExceptionally(new PlcIoException(writeExceptionMsg));
             return cf;
         }
diff --git a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockDriver.java b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockDriver.java
index d5c574b35..887ce2ecb 100644
--- a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockDriver.java
+++ b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockDriver.java
@@ -18,9 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.edgent.mock;
 
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 
 public class MockDriver implements PlcDriver {
diff --git a/integrations/apache-edgent/src/test/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/integrations/apache-edgent/src/test/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
similarity index 100%
rename from integrations/apache-edgent/src/test/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
rename to integrations/apache-edgent/src/test/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
diff --git a/integrations/apache-kafka/config/source.properties b/integrations/apache-kafka/config/source.properties
index cbd00f576..afa7e93a3 100644
--- a/integrations/apache-kafka/config/source.properties
+++ b/integrations/apache-kafka/config/source.properties
@@ -19,6 +19,5 @@ limitations under the License.
 name=plc-source-test
 connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector
 topic=test
-url=test:unused
-queries=RANDOM/foo:INTEGER,RANDOM/bar:STRING
+queries=test:unused#RANDOM/foo:INTEGER,test:another#RANDOM/bar:STRING
 rate=2000
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index 648a32e4a..4c0429a8a 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -23,8 +23,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.kafka.util.VersionUtil;
@@ -37,7 +36,6 @@ Licensed to the Apache Software Foundation (ASF) under one
     private String url;
 
     private PlcConnection plcConnection;
-    private PlcWriter plcWriter;
 
     @Override
     public String version() {
@@ -51,8 +49,9 @@ public void start(Map<String, String> props) {
 
         openConnection();
 
-        plcWriter = plcConnection.getWriter()
-            .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
+        if (!plcConnection.writeRequestBuilder().isPresent()) {
+            throw new ConnectException("Writing not supported on this connection");
+        }
     }
 
     @Override
@@ -65,7 +64,7 @@ public void put(Collection<SinkRecord> records) {
         for (SinkRecord record: records) {
             String query = record.key().toString();
             Object value = record.value();
-            PlcWriteRequest.Builder builder = plcWriter.writeRequestBuilder();
+            PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder().get();
             PlcWriteRequest plcRequest = addToBuilder(builder, query, value).build();
             doWrite(plcRequest);
         }
@@ -107,7 +106,7 @@ private void closeConnection() {
 
     private void doWrite(PlcWriteRequest request) {
         try {
-            plcWriter.write(request).get();
+            request.execute().get();
         } catch (ExecutionException | InterruptedException e) {
             throw new ConnectException("Caught exception during write", e);
         }
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4d014a535..bb1392e70 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -22,36 +22,28 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.kafka.connect.util.ConnectorUtils;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.stream.Collectors;
 
 public class Plc4xSourceConnector extends SourceConnector {
-    static final String TOPIC_CONFIG = "topic";
+    private static final String TOPIC_CONFIG = "topic";
     private static final String TOPIC_DOC = "Kafka topic to publish to";
 
-    static final String URL_CONFIG = "url";
-    private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
-
-    static final String QUERIES_CONFIG = "queries";
+    private static final String QUERIES_CONFIG = "queries";
     private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
 
-    static final String RATE_CONFIG = "rate";
+    private static final String RATE_CONFIG = "rate";
     private static final Integer RATE_DEFAULT = 1000;
     private static final String RATE_DOC = "Polling rate";
 
-    static final ConfigDef CONFIG_DEF = new ConfigDef()
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
-        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
         .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
         .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
 
     private String topic;
-    private String url;
     private List<String> queries;
     private Integer rate;
 
@@ -63,23 +55,30 @@ Licensed to the Apache Software Foundation (ASF) under one
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         List<Map<String, String>> configs = new LinkedList<>();
-        List<List<String>> queryGroups = ConnectorUtils.groupPartitions(queries, maxTasks);
-        for (List<String> queryGroup: queryGroups) {
+        Map<String, List<String>> groupedByHost = new HashMap<>();
+        queries.stream().map(query -> query.split("#", 2)).collect(Collectors.groupingBy(parts -> parts[0])).forEach((host, queries) -> {
+            groupedByHost.put(host, queries.stream().map(parts -> parts[1]).collect(Collectors.toList()));
+        });
+        if (groupedByHost.size() > maxTasks) {
+            // Not enough tasks
+            // TODO: throw exception?
+            return Collections.emptyList();
+        }
+        groupedByHost.forEach((host, qs) -> {
             Map<String, String> taskConfig = new HashMap<>();
-            taskConfig.put(TOPIC_CONFIG, topic);
-            taskConfig.put(URL_CONFIG, url);
-            taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup));
-            taskConfig.put(RATE_CONFIG, rate.toString());
+            taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic);
+            taskConfig.put(Plc4xSourceTask.URL_CONFIG, host);
+            taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", qs));
+            taskConfig.put(Plc4xSourceTask.RATE_CONFIG, rate.toString());
             configs.add(taskConfig);
-        }
+        });
         return configs;
     }
 
     @Override
     public void start(Map<String, String> props) {
-        AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
         topic = config.getString(TOPIC_CONFIG);
-        url = config.getString(URL_CONFIG);
         queries = config.getList(QUERIES_CONFIG);
         rate = config.getInt(RATE_CONFIG);
     }
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 7c048c5e0..f172a3872 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.kafka;
 
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
@@ -26,8 +27,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -44,6 +44,25 @@ Licensed to the Apache Software Foundation (ASF) under one
  * If the flag does not become true, the method returns null, otherwise a fetch is performed.
  */
 public class Plc4xSourceTask extends SourceTask {
+    static final String TOPIC_CONFIG = "topic";
+    private static final String TOPIC_DOC = "Kafka topic to publish to";
+
+    static final String URL_CONFIG = "url";
+    private static final String URL_DOC = "PLC URL";
+
+    static final String QUERIES_CONFIG = "queries";
+    private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
+
+    static final String RATE_CONFIG = "rate";
+    private static final Integer RATE_DEFAULT = 1000;
+    private static final String RATE_DOC = "Polling rate";
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
+        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
+        .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
+        .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
+
     private final static long WAIT_LIMIT_MILLIS = 100;
     private final static long TIMEOUT_LIMIT_MILLIS = 5000;
 
@@ -61,8 +80,6 @@ Licensed to the Apache Software Foundation (ASF) under one
     private List<String> queries;
 
     private PlcConnection plcConnection;
-    private PlcReader plcReader;
-    private PlcReadRequest plcRequest;
 
     // TODO: should we use shared (static) thread pool for this?
     private ScheduledExecutorService scheduler;
@@ -75,24 +92,18 @@ public String version() {
 
     @Override
     public void start(Map<String, String> props) {
-        AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
-        topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG);
-        url = config.getString(Plc4xSourceConnector.URL_CONFIG);
-        queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG);
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
+        topic = config.getString(TOPIC_CONFIG);
+        url = config.getString(URL_CONFIG);
+        queries = config.getList(QUERIES_CONFIG);
 
         openConnection();
 
-        plcReader = plcConnection.getReader()
-            .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
-
-
-        PlcReadRequest.Builder builder = plcReader.readRequestBuilder();
-        for (String query : queries) {
-            builder.addItem(query, query);
+        if (!plcConnection.readRequestBuilder().isPresent()) {
+            throw new ConnectException("Reading not supported on this connection");
         }
-        plcRequest = builder.build();
 
-        int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
+        int rate = Integer.valueOf(props.get(RATE_CONFIG));
         scheduler = Executors.newScheduledThreadPool(1);
         scheduler.scheduleAtFixedRate(Plc4xSourceTask.this::scheduleFetch, rate, rate, TimeUnit.MILLISECONDS);
     }
@@ -154,9 +165,9 @@ private synchronized boolean awaitFetch(long milliseconds) throws InterruptedExc
     }
 
     private List<SourceRecord> doFetch() throws InterruptedException {
-        final CompletableFuture<PlcReadResponse<?>> response = plcReader.read(plcRequest);
+        final CompletableFuture<? extends PlcReadResponse> response = createReadRequest().execute();
         try {
-            final PlcReadResponse<?> received = response.get(TIMEOUT_LIMIT_MILLIS, TimeUnit.MILLISECONDS);
+            final PlcReadResponse received = response.get(TIMEOUT_LIMIT_MILLIS, TimeUnit.MILLISECONDS);
             return extractValues(received);
         } catch (ExecutionException e) {
             throw new ConnectException("Could not fetch data from source", e);
@@ -165,7 +176,15 @@ private synchronized boolean awaitFetch(long milliseconds) throws InterruptedExc
         }
     }
 
-    private List<SourceRecord> extractValues(PlcReadResponse<?> response) {
+    private PlcReadRequest createReadRequest() {
+        PlcReadRequest.Builder builder = plcConnection.readRequestBuilder().get();
+        for (String query : queries) {
+            builder.addItem(query, query);
+        }
+        return builder.build();
+    }
+
+    private List<SourceRecord> extractValues(PlcReadResponse response) {
         final List<SourceRecord> result = new LinkedList<>();
         for (String query : queries) {
             final PlcResponseCode rc = response.getResponseCode(query);
diff --git a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index 1f8cb852c..080354a52 100644
--- a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++ b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -27,7 +27,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 
 import java.util.*;
diff --git a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
index 4c8721b25..d743e573a 100644
--- a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
+++ b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
@@ -28,7 +28,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 
@@ -51,11 +51,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         }
 
         // Get an instance of a component able to write to a PLC.
-        PlcWriter writer = getConnection().getWriter().orElseThrow(
-            () -> new ProcessException("Writing not supported by connection"));
+        PlcConnection connection = getConnection();
+        if (!connection.writeRequestBuilder().isPresent()) {
+            throw new ProcessException("Writing not supported by connection");
+        }
 
         // Prepare the request.
-        PlcWriteRequest.Builder builder = writer.writeRequestBuilder();
+        PlcWriteRequest.Builder builder = connection.writeRequestBuilder().get();
         flowFile.getAttributes().forEach((field, value) -> {
             String address = getAddress(field);
             if(address != null) {
@@ -65,7 +67,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         PlcWriteRequest writeRequest = builder.build();
 
         // Send the request to the PLC.
-        CompletableFuture<PlcWriteResponse<?>> future = writer.write(writeRequest);
+        CompletableFuture<? extends PlcWriteResponse> future = writeRequest.execute();
         future.whenComplete((response, throwable) -> {
             if (throwable != null) {
                 session.transfer(session.create(), FAILURE);
diff --git a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index d7a3b7374..09fabff2f 100644
--- a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++ b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -27,7 +27,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.json.simple.JSONObject;
@@ -45,23 +45,25 @@ Licensed to the Apache Software Foundation (ASF) under one
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         // Get an instance of a component able to read from a PLC.
-        PlcReader reader = getConnection().getReader().orElseThrow(
-            () -> new ProcessException("Writing not supported by connection"));
+        PlcConnection connection = getConnection();
 
         // Prepare the request.
-        PlcReadRequest.Builder builder = reader.readRequestBuilder();
-        getFields().forEach(field -> {
-            String address = getAddress(field);
-            if(address != null) {
-                builder.addItem(field, address);
-            }
-        });
-        PlcReadRequest readRequest = builder.build();
+        if (!connection.readRequestBuilder().isPresent()) {
+            throw new ProcessException("Writing not supported by connection");
+        }
 
         FlowFile flowFile = session.create();
         session.append(flowFile, out -> {
             try {
-                PlcReadResponse<?> response = reader.read(readRequest).get();
+                PlcReadRequest.Builder builder = connection.readRequestBuilder().get();
+                getFields().forEach(field -> {
+                    String address = getAddress(field);
+                    if(address != null) {
+                        builder.addItem(field, address);
+                    }
+                });
+                PlcReadRequest readRequest = builder.build();
+                PlcReadResponse response = readRequest.execute().get();
                 JSONObject obj = new JSONObject();
                 for (String fieldName : response.getFieldNames()) {
                     for(int i = 0; i < response.getNumberOfValues(fieldName); i++) {
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnection.java
similarity index 76%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
rename to plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnection.java
index c508217c2..c1a78c56e 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcConnection.java
@@ -16,9 +16,13 @@ Licensed to the Apache Software Foundation (ASF) under one
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.connection;
+package org.apache.plc4x.java.api;
 
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 
 import java.util.Optional;
 
@@ -53,10 +57,12 @@ Licensed to the Apache Software Foundation (ASF) under one
     @Override
     void close() throws Exception;
 
-    Optional<PlcReader> getReader();
+    Optional<PlcReadRequest.Builder> readRequestBuilder();
 
-    Optional<PlcWriter> getWriter();
+    Optional<PlcWriteRequest.Builder> writeRequestBuilder();
 
-    Optional<PlcSubscriber> getSubscriber();
+    Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder();
+
+    Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder();
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
deleted file mode 100644
index 0581bd690..000000000
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.api.connection;
-
-import org.apache.plc4x.java.api.messages.*;
-import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
-import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
-
-/**
- * Interface implemented by all PlcConnections that are able to receive notifications from remote resources.
- */
-public interface PlcSubscriber {
-
-    /**
-     * Subscribes to fields on the PLC.
-     *
-     * @param subscriptionRequest subscription request containing at least one subscription request item.
-     * @return subscription response containing a subscription response item for each subscription request item.
-     */
-    CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest);
-
-    /**
-     * Subscribes to fields on the PLC.
-     *
-     * @param plcSubscriptionRequestBuilderConsumer consumer which can be used to build requests.
-     * @return subscription response containing a subscription response item for each subscription request item.
-     */
-    default CompletableFuture<PlcSubscriptionResponse> subscribe(Consumer<PlcSubscriptionRequest.Builder> plcSubscriptionRequestBuilderConsumer) {
-        PlcSubscriptionRequest.Builder builder = subscriptionRequestBuilder();
-        plcSubscriptionRequestBuilderConsumer.accept(builder);
-        return subscribe(builder.build());
-    }
-
-    /**
-     * Unsubscribes from fields on the PLC. For unsubscribing the unsubscription request uses the subscription
-     * handle returned as part of the subscription response item.
-     *
-     * @param unsubscriptionRequest unsubscription request containing at least one unsubscription request item.
-     * @return unsubscription response containing a unsubscription response item for each unsubscription request item.
-     */
-    CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest);
-
-    /**
-     * Unsubscribes from fields on the PLC. For unsubscribing the unsubscription request uses the subscription
-     * handle returned as part of the subscription response item.
-     *
-     * @param plcSubscriptionRequestBuilderConsumer consumer which can be used to build requests.
-     * @return unsubscription response containing a unsubscription response item for each unsubscription request item.
-     */
-    default CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(Consumer<PlcUnsubscriptionRequest.Builder> plcSubscriptionRequestBuilderConsumer) {
-        PlcUnsubscriptionRequest.Builder builder = unsubscriptionRequestBuilder();
-        plcSubscriptionRequestBuilderConsumer.accept(builder);
-        return unsubscribe(builder.build());
-    }
-
-    /**
-     * Convenience method to subscribe a {@link Consumer} to all fields of the subscription.
-     *
-     * @param subscriptionRequest subscription request
-     * @param consumer            consumer for all {@link PlcSubscriptionEvent}s
-     * @return TODO: document me
-     * @throws ExecutionException   something went wrong.
-     * @throws InterruptedException something went wrong.
-     */
-    default PlcConsumerRegistration register(PlcSubscriptionRequest subscriptionRequest, Consumer<PlcSubscriptionEvent> consumer) throws ExecutionException, InterruptedException {
-        PlcSubscriptionResponse plcSubscriptionResponse = subscribe(subscriptionRequest).get();
-        // TODO: we need to return the plcSubscriptionResponse here too as we need this to unsubscribe...
-        return register(consumer, plcSubscriptionResponse.getSubscriptionHandles().toArray(new PlcSubscriptionHandle[0]));
-    }
-
-    /**
-     * Convenience method to subscribe a {@link Consumer} to all fields of the subscription.
-     *
-     * @param subscriptionRequestBuilderConsumer consumer for building subscription request.
-     * @param consumer                           consumer for all {@link PlcSubscriptionEvent}s
-     * @return TODO: document me
-     * @throws ExecutionException   something went wrong.
-     * @throws InterruptedException something went wrong.
-     */
-    default PlcConsumerRegistration register(Consumer<PlcSubscriptionRequest.Builder> subscriptionRequestBuilderConsumer, Consumer<PlcSubscriptionEvent> consumer) throws ExecutionException, InterruptedException {
-        PlcSubscriptionRequest.Builder builder = subscriptionRequestBuilder();
-        subscriptionRequestBuilderConsumer.accept(builder);
-        return register(builder.build(), consumer);
-    }
-
-    /**
-     * @param consumer
-     * @param handles
-     * @return TODO: document me
-     */
-    PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles);
-
-    default PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, PlcSubscriptionHandle... handles) {
-        return register(consumer, Arrays.asList(handles));
-    }
-
-    /**
-     * // TODO: document me.
-     */
-    void unregister(PlcConsumerRegistration registration);
-
-    PlcSubscriptionRequest.Builder subscriptionRequestBuilder();
-
-    PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder();
-
-}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcFieldRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcFieldRequest.java
index cff0cb8d3..85263b76a 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcFieldRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcFieldRequest.java
@@ -22,9 +22,13 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
 
 public interface PlcFieldRequest extends PlcRequest {
 
+    @Override
+    CompletableFuture<? extends PlcFieldResponse> execute();
+
     int getNumberOfFields();
 
     LinkedHashSet<String> getFieldNames();
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcFieldResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcFieldResponse.java
index 768eb586d..cf2be496f 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcFieldResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcFieldResponse.java
@@ -26,9 +26,11 @@ Licensed to the Apache Software Foundation (ASF) under one
 /**
  * Base type for all response messages sent as response for a prior request
  * from a plc to the plc4x system.
- * @param <REQUEST_TYPE> the type of the matching request.
  */
-public interface PlcFieldResponse<REQUEST_TYPE extends PlcFieldRequest> extends PlcResponse<REQUEST_TYPE> {
+public interface PlcFieldResponse extends PlcResponse {
+
+    @Override
+    PlcFieldRequest getRequest();
 
     Collection<String> getFieldNames();
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadRequest.java
index 2b9a7d53b..e60afc341 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadRequest.java
@@ -18,13 +18,23 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.api.messages;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Request to read one or more values from a plc.
  */
 public interface PlcReadRequest extends PlcFieldRequest {
 
-    interface Builder extends PlcMessageBuilder<PlcReadRequest> {
+    @Override
+    CompletableFuture<? extends PlcReadResponse> execute();
+
+    interface Builder extends PlcRequestBuilder {
+
+        @Override
+        PlcReadRequest build();
+
         Builder addItem(String name, String fieldQuery);
+
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
index bf5900b8a..e160b1667 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
@@ -28,7 +28,10 @@ Licensed to the Apache Software Foundation (ASF) under one
 /**
  * Response to a {@link PlcReadRequest}.
  */
-public interface PlcReadResponse<T extends PlcReadRequest> extends PlcFieldResponse<T> {
+public interface PlcReadResponse extends PlcFieldResponse {
+
+    @Override
+    PlcReadRequest getRequest();
 
     int getNumberOfValues(String name);
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
index f2367f97c..4b40bf119 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
@@ -18,9 +18,11 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.api.messages;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Base type for all messages sent from the plc4x system to a connected plc.
  */
 public interface PlcRequest extends PlcMessage {
-
+    CompletableFuture<? extends PlcResponse> execute();
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcMessageBuilder.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestBuilder.java
similarity index 92%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcMessageBuilder.java
rename to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestBuilder.java
index fd2d20ed5..b5f9b80c8 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcMessageBuilder.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestBuilder.java
@@ -18,8 +18,6 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.api.messages;
 
-public interface PlcMessageBuilder<T> {
-
-    T build();
-
+public interface PlcRequestBuilder {
+    PlcRequest build();
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcResponse.java
index 078176235..7fdcffc14 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcResponse.java
@@ -21,10 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 /**
  * Base type for all response messages sent as response for a prior request
  * from a plc to the plc4x system.
- * @param <REQUEST_TYPE> the type of the matching request.
  */
-public interface PlcResponse<REQUEST_TYPE extends PlcRequest> extends PlcMessage {
-
-    REQUEST_TYPE getRequest();
-
+public interface PlcResponse extends PlcMessage {
+    PlcRequest getRequest();
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
index 19cd66e7c..3e51d6f82 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
@@ -19,10 +19,18 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.api.messages;
 
 import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
 
 public interface PlcSubscriptionRequest extends PlcFieldRequest {
 
-    interface Builder extends PlcMessageBuilder<PlcSubscriptionRequest> {
+    @Override
+    CompletableFuture<? extends PlcSubscriptionResponse> execute();
+
+    interface Builder extends PlcRequestBuilder {
+
+        @Override
+        PlcSubscriptionRequest build();
+
         /**
          * Adds a new field to the to be constructed request which should be polled cyclically.
          *
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
index 57d71c516..30b802a2f 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
@@ -22,7 +22,10 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import java.util.Collection;
 
-public interface PlcSubscriptionResponse extends PlcFieldResponse<PlcSubscriptionRequest> {
+public interface PlcSubscriptionResponse extends PlcFieldResponse {
+
+    @Override
+    PlcSubscriptionRequest getRequest();
 
     PlcSubscriptionHandle getSubscriptionHandle(String name);
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
index ac7b66a0c..8b95f4439 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
@@ -21,10 +21,18 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 
 public interface PlcUnsubscriptionRequest extends PlcFieldRequest {
 
-    interface Builder extends PlcMessageBuilder<PlcUnsubscriptionRequest> {
+    @Override
+    CompletableFuture<? extends PlcUnsubscriptionResponse> execute();
+
+    interface Builder extends PlcRequestBuilder {
+
+        @Override
+        PlcUnsubscriptionRequest build();
+
         /**
          * TODO document me:
          *
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
index 9764f8dd6..4205e8b0b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
@@ -18,17 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.api.messages;
 
-public interface PlcUnsubscriptionResponse extends PlcFieldResponse<PlcUnsubscriptionRequest> {
+public interface PlcUnsubscriptionResponse extends PlcFieldResponse {
 
-    interface Builder extends PlcMessageBuilder<PlcUnsubscriptionResponse> {
-        /**
-         * Adds a new field to the to be constructed request which should cancel a previously
-         * created subscription.
-         *
-         * @param name alias of the field.
-         * @return builder.
-         */
-        PlcReadRequest.Builder addField(String name);
-    }
+    @Override
+    PlcUnsubscriptionRequest getRequest();
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteRequest.java
index 69ac6bf70..95de45872 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteRequest.java
@@ -23,12 +23,19 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.concurrent.CompletableFuture;
 
 public interface PlcWriteRequest extends PlcFieldRequest {
 
+    @Override
+    CompletableFuture<? extends PlcWriteResponse> execute();
+
     int getNumberOfValues(String name);
 
-    interface Builder extends PlcMessageBuilder<PlcWriteRequest> {
+    interface Builder extends PlcRequestBuilder {
+
+        @Override
+        PlcWriteRequest build();
 
         PlcWriteRequest.Builder addItem(String name, String fieldQuery, Boolean... values);
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteResponse.java
index 3bfa79cdd..035a3d73e 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteResponse.java
@@ -18,6 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.api.messages;
 
-public interface PlcWriteResponse<T extends PlcWriteRequest> extends PlcFieldResponse<T> {
+public interface PlcWriteResponse extends PlcFieldResponse {
+
+    @Override
+    PlcWriteRequest getRequest();
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcDriver.java b/plc4j/api/src/main/java/org/apache/plc4x/java/spi/PlcDriver.java
similarity index 94%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcDriver.java
rename to plc4j/api/src/main/java/org/apache/plc4x/java/spi/PlcDriver.java
index c2d45fb1f..6247f17a8 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/PlcDriver.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/spi/PlcDriver.java
@@ -16,16 +16,16 @@ Licensed to the Apache Software Foundation (ASF) under one
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api;
+package org.apache.plc4x.java.spi;
 
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 
 /**
  * General interface defining the minimal methods required for adding a new type of driver to the PLC4J system.
  *
- * <b>Note that each driver has to add a service file called org.apache.plc4x.java.PlcDriver to
+ * <b>Note that each driver has to add a service file called org.apache.plc4x.java.spi.PlcDriver to
  * src/main/resources/META-INF which contains the fully qualified classname in order to get loaded
  * by the PlcDriverManager instances.</b>
  */
diff --git a/plc4j/core/src/main/java/org/apache/plc4x/java/PlcDriverManager.java b/plc4j/core/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
index 8539a61e8..3b50927ae 100644
--- a/plc4j/core/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
+++ b/plc4j/core/src/main/java/org/apache/plc4x/java/PlcDriverManager.java
@@ -18,9 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java;
 
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 
 import java.net.URI;
diff --git a/plc4j/core/src/test/java/org/apache/plc4x/java/mock/DoubleMockDriver.java b/plc4j/core/src/test/java/org/apache/plc4x/java/mock/DoubleMockDriver.java
index 3c5e3d534..05250ab72 100644
--- a/plc4j/core/src/test/java/org/apache/plc4x/java/mock/DoubleMockDriver.java
+++ b/plc4j/core/src/test/java/org/apache/plc4x/java/mock/DoubleMockDriver.java
@@ -18,9 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.mock;
 
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 
 public class DoubleMockDriver implements PlcDriver {
diff --git a/plc4j/core/src/test/java/org/apache/plc4x/java/mock/MockConnection.java b/plc4j/core/src/test/java/org/apache/plc4x/java/mock/MockConnection.java
index 0be37f96b..fc5210085 100644
--- a/plc4j/core/src/test/java/org/apache/plc4x/java/mock/MockConnection.java
+++ b/plc4j/core/src/test/java/org/apache/plc4x/java/mock/MockConnection.java
@@ -22,10 +22,15 @@ Licensed to the Apache Software Foundation (ASF) under one
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.TestChannelFactory;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class MockConnection extends AbstractPlcConnection {
@@ -37,6 +42,27 @@ public MockConnection(PlcAuthentication authentication) {
         this.authentication = authentication;
     }
 
+    @Override
+    public Optional<PlcReadRequest.Builder> readRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcWriteRequest.Builder> writeRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+
     @Override
     protected ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture) {
         return new ChannelInitializer() {
diff --git a/plc4j/core/src/test/java/org/apache/plc4x/java/mock/MockDriver.java b/plc4j/core/src/test/java/org/apache/plc4x/java/mock/MockDriver.java
index 6d47d0f90..477155029 100644
--- a/plc4j/core/src/test/java/org/apache/plc4x/java/mock/MockDriver.java
+++ b/plc4j/core/src/test/java/org/apache/plc4x/java/mock/MockDriver.java
@@ -18,9 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.mock;
 
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 
 public class MockDriver implements PlcDriver {
diff --git a/plc4j/core/src/test/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/plc4j/core/src/test/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
similarity index 100%
rename from plc4j/core/src/test/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
rename to plc4j/core/src/test/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
diff --git a/plc4j/core/src/test/resources/test/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/plc4j/core/src/test/resources/test/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
similarity index 100%
rename from plc4j/core/src/test/resources/test/META-INF/services/org.apache.plc4x.java.api.PlcDriver
rename to plc4j/core/src/test/resources/test/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/AdsPlcDriver.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/AdsPlcDriver.java
index 2a208adc9..1ce766664 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/AdsPlcDriver.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/AdsPlcDriver.java
@@ -22,9 +22,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
 import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
 import org.apache.plc4x.java.ads.connection.AdsConnectionFactory;
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 
 import java.net.InetAddress;
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
index a80e5e747..5ac73efe0 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
@@ -31,9 +31,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.ads.model.AdsPlcFieldHandler;
 import org.apache.plc4x.java.ads.model.DirectAdsField;
 import org.apache.plc4x.java.ads.model.SymbolicAdsField;
-import org.apache.plc4x.java.api.connection.PlcProprietarySender;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.base.messages.PlcProprietarySender;
+import org.apache.plc4x.java.base.messages.PlcReader;
+import org.apache.plc4x.java.base.messages.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.*;
@@ -43,6 +43,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.concurrent.*;
 
 public abstract class AdsAbstractPlcConnection extends AbstractPlcConnection implements PlcReader, PlcWriter, PlcProprietarySender {
@@ -92,7 +93,7 @@ public AmsPort getSourceAmsPort() {
     }
 
     @Override
-    public CompletableFuture<PlcReadResponse<?>> read(PlcReadRequest readRequest) {
+    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         mapFields(readRequest);
         CompletableFuture<InternalPlcReadResponse> readFuture = new CompletableFuture<>();
         ChannelFuture channelFuture = channel.writeAndFlush(new PlcRequestContainer<>((InternalPlcReadRequest) readRequest, readFuture));
@@ -106,12 +107,27 @@ public AmsPort getSourceAmsPort() {
     }
 
     @Override
-    public PlcReadRequest.Builder readRequestBuilder() {
-        return new DefaultPlcReadRequest.Builder(new AdsPlcFieldHandler());
+    public Optional<PlcReadRequest.Builder> readRequestBuilder() {
+        return Optional.of(new DefaultPlcReadRequest.Builder(this, new AdsPlcFieldHandler()));
     }
 
     @Override
-    public CompletableFuture<PlcWriteResponse<?>> write(PlcWriteRequest writeRequest) {
+    public Optional<PlcWriteRequest.Builder> writeRequestBuilder() {
+        return Optional.of(new DefaultPlcWriteRequest.Builder(this, new AdsPlcFieldHandler()));
+    }
+
+    @Override
+    public Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         mapFields(writeRequest);
         CompletableFuture<InternalPlcWriteResponse> writeFuture = new CompletableFuture<>();
         ChannelFuture channelFuture = channel.writeAndFlush(new PlcRequestContainer<>((InternalPlcWriteRequest) writeRequest, writeFuture));
@@ -125,14 +141,9 @@ public AmsPort getSourceAmsPort() {
     }
 
     @Override
-    public PlcWriteRequest.Builder writeRequestBuilder() {
-        return new DefaultPlcWriteRequest.Builder(new AdsPlcFieldHandler());
-    }
-
-    @Override
-    public <PROP_REQUEST, PROP_RESPONSE> CompletableFuture<PlcProprietaryResponse<PlcProprietaryRequest<PROP_REQUEST>, PROP_RESPONSE>> send(PlcProprietaryRequest<PROP_REQUEST> proprietaryRequest) {
-        CompletableFuture<InternalPlcProprietaryResponse<PROP_REQUEST, PROP_RESPONSE>> sendFuture = new CompletableFuture<>();
-        ChannelFuture channelFuture = channel.writeAndFlush(new PlcRequestContainer<>((InternalPlcProprietaryRequest<PROP_REQUEST>) proprietaryRequest, sendFuture));
+    public <T> CompletableFuture<PlcProprietaryResponse<T>> send(PlcProprietaryRequest proprietaryRequest) {
+        CompletableFuture<InternalPlcProprietaryResponse<T>> sendFuture = new CompletableFuture<>();
+        ChannelFuture channelFuture = channel.writeAndFlush(new PlcRequestContainer<>((InternalPlcProprietaryRequest) proprietaryRequest, sendFuture));
         channelFuture.addListener(future -> {
             if (!future.isSuccess()) {
                 sendFuture.completeExceptionally(future.cause());
@@ -168,9 +179,9 @@ protected void mapFields(SymbolicAdsField symbolicAdsField) {
             );
 
             // TODO: This is blocking, should be changed to be async.
-            CompletableFuture<InternalPlcProprietaryResponse<InternalPlcProprietaryRequest<AdsWriteRequest>, AdsReadWriteResponse>> getHandelFuture = new CompletableFuture<>();
+            CompletableFuture<InternalPlcProprietaryResponse<AdsReadWriteResponse>> getHandelFuture = new CompletableFuture<>();
             channel.writeAndFlush(new PlcRequestContainer<>(new DefaultPlcProprietaryRequest<>(adsReadWriteRequest), getHandelFuture));
-            InternalPlcProprietaryResponse<InternalPlcProprietaryRequest<AdsWriteRequest>, AdsReadWriteResponse> getHandleResponse = getFromFuture(getHandelFuture, SYMBOL_RESOLVE_TIMEOUT);
+            InternalPlcProprietaryResponse<AdsReadWriteResponse> getHandleResponse = getFromFuture(getHandelFuture, SYMBOL_RESOLVE_TIMEOUT);
             AdsReadWriteResponse response = getHandleResponse.getResponse();
 
             if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
index 06d52ca66..3d4c8bae9 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
@@ -28,9 +28,12 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.ads.protocol.Payload2SerialProtocol;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
 import org.apache.plc4x.java.ads.protocol.util.SingleMessageRateLimiter;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
 import org.apache.plc4x.java.base.connection.SerialChannelFactory;
 import org.apache.plc4x.java.base.protocol.SingleItemToSingleRequestProtocol;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class AdsSerialPlcConnection extends AdsAbstractPlcConnection {
@@ -62,7 +65,7 @@ protected void initChannel(Channel channel) {
                 pipeline.addLast(new SingleMessageRateLimiter());
                 pipeline.addLast(new Ads2PayloadProtocol());
                 pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping));
-                pipeline.addLast(new SingleItemToSingleRequestProtocol(timer));
+                pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsSerialPlcConnection.this, AdsSerialPlcConnection.this, timer));
             }
         };
     }
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 80fb5bb4f..406557f46 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -32,7 +32,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
 import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
+import org.apache.plc4x.java.base.messages.PlcSubscriber;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcNotImplementedException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
@@ -43,7 +43,6 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
 import org.apache.plc4x.java.base.messages.*;
-import org.apache.plc4x.java.base.model.DefaultPlcConsumerRegistration;
 import org.apache.plc4x.java.base.model.InternalPlcConsumerRegistration;
 import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
 import org.apache.plc4x.java.base.protocol.SingleItemToSingleRequestProtocol;
@@ -53,7 +52,10 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
@@ -114,7 +116,7 @@ protected void initChannel(Channel channel) {
                 pipeline.addLast(new Payload2TcpProtocol());
                 pipeline.addLast(new Ads2PayloadProtocol());
                 pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping));
-                pipeline.addLast(new SingleItemToSingleRequestProtocol(timer));
+                pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer)); // TODO: remove nulls; implement correctly
             }
         };
     }
@@ -204,9 +206,9 @@ else if (field instanceof DirectAdsField) {
 
         // Send the request to the plc and wait for a response
         // TODO: This is blocking, should be changed to be async.
-        CompletableFuture<InternalPlcProprietaryResponse<InternalPlcProprietaryRequest<AdsAddDeviceNotificationRequest>, AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>();
+        CompletableFuture<InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>();
         channel.writeAndFlush(new PlcRequestContainer<>(new DefaultPlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture));
-        InternalPlcProprietaryResponse<InternalPlcProprietaryRequest<AdsAddDeviceNotificationRequest>, AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
+        InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
         AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse();
 
         // Abort if we got anything but a successful response.
@@ -241,11 +243,11 @@ else if (field instanceof DirectAdsField) {
                         Invoke.NONE,
                         adsSubscriptionHandle.getNotificationHandle()
                     );
-                CompletableFuture<InternalPlcProprietaryResponse<DefaultPlcProprietaryRequest<AdsDeleteDeviceNotificationRequest>, AdsDeleteDeviceNotificationResponse>> deleteDeviceFuture =
+                CompletableFuture<InternalPlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> deleteDeviceFuture =
                     new CompletableFuture<>();
                 channel.writeAndFlush(new PlcRequestContainer<>(new DefaultPlcProprietaryRequest<>(adsDeleteDeviceNotificationRequest), deleteDeviceFuture));
 
-                InternalPlcProprietaryResponse<DefaultPlcProprietaryRequest<AdsDeleteDeviceNotificationRequest>, AdsDeleteDeviceNotificationResponse> deleteDeviceResponse =
+                InternalPlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> deleteDeviceResponse =
                     getFromFuture(deleteDeviceFuture, DEL_DEVICE_TIMEOUT);
                 AdsDeleteDeviceNotificationResponse response = deleteDeviceResponse.getResponse();
                 if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
@@ -260,10 +262,11 @@ else if (field instanceof DirectAdsField) {
 
     @Override
     public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
-        return register(consumer, handles.toArray(new PlcSubscriptionHandle[0]));
+        return register(consumer, handles);
     }
 
-    @Override
+    // TODO: figure out what this is
+    /*@Override
     public InternalPlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, PlcSubscriptionHandle... handles) {
         Objects.requireNonNull(consumer);
         Objects.requireNonNull(handles);
@@ -297,7 +300,7 @@ public InternalPlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> c
         getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
 
         return internalPlcConsumerRegistration;
-    }
+    }*/
 
     @Override
     public void unregister(PlcConsumerRegistration plcConsumerRegistration) {
@@ -310,13 +313,13 @@ public void unregister(PlcConsumerRegistration plcConsumerRegistration) {
     }
 
     @Override
-    public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() {
-        return new DefaultPlcSubscriptionRequest.Builder(new AdsPlcFieldHandler());
+    public Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder() {
+        return Optional.of(new DefaultPlcSubscriptionRequest.Builder(this, new AdsPlcFieldHandler()));
     }
 
     @Override
-    public PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder() {
-        return new DefaultPlcUnsubscriptionRequest.Builder();
+    public Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder() {
+        return Optional.of(new DefaultPlcUnsubscriptionRequest.Builder(this));
     }
 
     @Override
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
index e128bde17..fbb359e7a 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
@@ -35,7 +35,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.api.exceptions.PlcIoException;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolPayloadTooBigException;
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
+import org.apache.plc4x.java.base.messages.PlcProprietaryRequest;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
diff --git a/plc4j/protocols/ads/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/plc4j/protocols/ads/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
similarity index 100%
rename from plc4j/protocols/ads/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
rename to plc4j/protocols/ads/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index b4da5109d..491f606c3 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -19,17 +19,11 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.ads;
 
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
-import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
-import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.messages.*;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 public class ManualPlc4XAdsTest {
 
@@ -45,29 +39,29 @@ public static void main(String... args) throws Exception {
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
             System.out.println("PlcConnection " + plcConnection);
 
-            PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
-            CompletableFuture<PlcReadResponse<?>> response = reader.read(builder -> builder.addItem("station", "Allgemein_S2.Station:BYTE"));
-            PlcReadResponse<?> readResponse = response.get();
+            PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("station", "Allgemein_S2.Station:BYTE").build();
+            CompletableFuture<? extends PlcReadResponse> response = readRequest.execute();
+            PlcReadResponse readResponse = response.get();
             System.out.println("Response " + readResponse);
             Collection<Integer> stations = readResponse.getAllIntegers("station");
             stations.forEach(System.out::println);
 
-            PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe not available"));
-
-            CompletableFuture<PlcSubscriptionResponse> subscribeResponse = plcSubscriber.subscribe(builder -> builder.addChangeOfStateField("stationChange", "Allgemein_S2.Station:BYTE"));
+            PlcSubscriptionRequest subscriptionRequest = plcConnection.subscriptionRequestBuilder().get().addChangeOfStateField("stationChange", "Allgemein_S2.Station:BYTE").build();
+            CompletableFuture<? extends PlcSubscriptionResponse> subscribeResponse = subscriptionRequest.execute();
             PlcSubscriptionResponse plcSubscriptionResponse = subscribeResponse.get();
 
-            PlcConsumerRegistration plcConsumerRegistration = plcSubscriber.register(System.out::println, plcSubscriptionResponse.getSubscriptionHandles());
+            // TODO: figure out what to do with this
+            /*PlcConsumerRegistration plcConsumerRegistration = plcSubscriber.register(System.out::println, plcSubscriptionResponse.getSubscriptionHandles());
 
             TimeUnit.SECONDS.sleep(5);
 
             plcSubscriber.unregister(plcConsumerRegistration);
-            CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionResponse = plcSubscriber.unsubscribe(builder -> builder.addHandles(plcSubscriptionResponse.getSubscriptionHandles()));
+            PlcUnsubscriptionRequest unsubscriptionRequest = plcConnection.unsubscriptionRequestBuilder().get().addHandles(plcSubscriptionResponse.getSubscriptionHandles()).build();
+            CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionResponse = plcSubscriber.unsubscribe(unsubscriptionRequest);
 
             unsubscriptionResponse
                 .get(5, TimeUnit.SECONDS);
-            System.out.println(unsubscriptionResponse);
+            System.out.println(unsubscriptionResponse);*/
         }
         System.exit(0);
     }
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/adslib/AmsRequest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/adslib/AmsRequest.java
index 20b865e88..a75e033f7 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/adslib/AmsRequest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/adslib/AmsRequest.java
@@ -19,7 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.ads.adslib;
 
 import org.apache.plc4x.java.ads.api.generic.AmsPacket;
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
+import org.apache.plc4x.java.base.messages.PlcProprietaryRequest;
 import org.apache.plc4x.java.base.messages.DefaultPlcProprietaryRequest;
 
 import java.util.concurrent.CompletableFuture;
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/adslib/AmsRouter.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/adslib/AmsRouter.java
index 935fc87c1..2ed462490 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/adslib/AmsRouter.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/adslib/AmsRouter.java
@@ -29,8 +29,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.ads.connection.AdsTcpPlcConnection;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
-import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
+import org.apache.plc4x.java.base.messages.PlcProprietaryRequest;
+import org.apache.plc4x.java.base.messages.PlcProprietaryResponse;
 
 import java.io.IOException;
 import java.net.*;
@@ -202,9 +202,9 @@ synchronized AdsTcpPlcConnection GetConnection(AmsNetId amsDest) {
         if (ads == null) {
             return AmsError.of(AdsReturnCode.ADS_CODE_7);
         }
-        CompletableFuture<PlcProprietaryResponse<PlcProprietaryRequest<T>, R>> completableFuture = ads.send(plcProprietaryRequest);
+        CompletableFuture<PlcProprietaryResponse<R>> completableFuture = ads.send(plcProprietaryRequest);
         try {
-            PlcProprietaryResponse<PlcProprietaryRequest<T>, R> response = completableFuture.get(3, TimeUnit.SECONDS);
+            PlcProprietaryResponse<R> response = completableFuture.get(3, TimeUnit.SECONDS);
             request.getResponseFuture().complete(response.getResponse());
             return response.getResponse().getAmsHeader().getCode();
         } catch (ExecutionException | TimeoutException e) {
@@ -229,9 +229,9 @@ AmsError AddNotification(AmsRequest<AdsAddDeviceNotificationRequest, AdsAddDevic
         }
 
         AdsLibPort port = ports.get(plcProprietaryRequest.getProprietaryRequest().getAmsHeader().getSourceAmsPort().getAsInt());
-        CompletableFuture<PlcProprietaryResponse<PlcProprietaryRequest<AdsAddDeviceNotificationRequest>, AdsAddDeviceNotificationResponse>> send = ads.send(plcProprietaryRequest);
+        CompletableFuture<PlcProprietaryResponse<AdsAddDeviceNotificationResponse>> send = ads.send(plcProprietaryRequest);
         try {
-            PlcProprietaryResponse<PlcProprietaryRequest<AdsAddDeviceNotificationRequest>, AdsAddDeviceNotificationResponse> response = send.get(3, TimeUnit.SECONDS);
+            PlcProprietaryResponse<AdsAddDeviceNotificationResponse> response = send.get(3, TimeUnit.SECONDS);
             if (response.getResponse().getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
                 return AmsError.of(response.getResponse().getResult().getAsLong());
             }
@@ -258,9 +258,9 @@ AmsError DelNotification(int port, ImmutablePair<AmsNetId, AmsPort> pAddr, AmsRe
         }
 
         AdsLibPort adsLibPort = ports.get(port);
-        CompletableFuture<PlcProprietaryResponse<PlcProprietaryRequest<AdsDeleteDeviceNotificationRequest>, AdsDeleteDeviceNotificationResponse>> send = ads.send(plcProprietaryRequest);
+        CompletableFuture<PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> send = ads.send(plcProprietaryRequest);
         try {
-            PlcProprietaryResponse<PlcProprietaryRequest<AdsDeleteDeviceNotificationRequest>, AdsDeleteDeviceNotificationResponse> response = send.get(3, TimeUnit.SECONDS);
+            PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> response = send.get(3, TimeUnit.SECONDS);
 
             adsLibPort.DelNotification(pAddr, plcProprietaryRequest.getProprietaryRequest().getNotificationHandle());
             request.getResponseFuture().complete(response.getResponse());
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java
index 69ca500da..a1517b37c 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java
@@ -33,7 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.ads.model.SymbolicAdsField;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcFieldRequest;
-import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
+import org.apache.plc4x.java.base.messages.PlcProprietaryResponse;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
@@ -123,7 +123,7 @@ public void getSourceAmsPort() {
 
     @Test
     public void read() {
-        CompletableFuture<PlcReadResponse<?>> read = SUT.read(mock(InternalPlcReadRequest.class));
+        CompletableFuture<PlcReadResponse> read = SUT.read(mock(InternalPlcReadRequest.class));
         assertNotNull(read);
 
         simulatePipelineError(() -> SUT.read(mock(InternalPlcReadRequest.class)));
@@ -131,7 +131,7 @@ public void read() {
 
     @Test
     public void write() {
-        CompletableFuture<PlcWriteResponse<?>> write = SUT.write(mock(InternalPlcWriteRequest.class));
+        CompletableFuture<PlcWriteResponse> write = SUT.write(mock(InternalPlcWriteRequest.class));
         assertNotNull(write);
 
         simulatePipelineError(() -> SUT.write(mock(InternalPlcWriteRequest.class)));
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java
index a4a88250d..624e607a8 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java
@@ -28,6 +28,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.ads.api.serial.AmsSerialAcknowledgeFrame;
 import org.apache.plc4x.java.ads.api.serial.AmsSerialFrame;
 import org.apache.plc4x.java.ads.api.serial.types.*;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.SerialChannelFactory;
@@ -75,7 +76,8 @@ public void initialState() {
     @Test
     public void testRead() throws Exception {
         prepareSerialSimulator();
-        CompletableFuture<PlcReadResponse<?>> read = SUT.read(builder -> builder.addItem("test", "0/0:BYTE"));
+        PlcReadRequest request = SUT.readRequestBuilder().get().addItem("test", "0/0:BYTE").build();
+        CompletableFuture<PlcReadResponse> read = SUT.read(request);
         PlcReadResponse plcReadResponse = read.get(30, TimeUnit.SECONDS);
         assertNotNull(plcReadResponse);
     }
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocolTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocolTest.java
index 95c662807..2cdd44932 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocolTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocolTest.java
@@ -110,14 +110,14 @@ Licensed to the Apache Software Foundation (ASF) under one
             .map(pair -> Stream.of(
                 ImmutablePair.of(
                     new PlcRequestContainer<>(
-                        (InternalPlcRequest) new DefaultPlcWriteRequest.Builder(new AdsPlcFieldHandler())
+                        (InternalPlcRequest) new DefaultPlcWriteRequest.Builder(null, new AdsPlcFieldHandler()) // TODO: remove null
                             .addItem(RandomStringUtils.randomAscii(10), "1/1:" + pair.adsDataType, pair.getValue())
                             .build(), new CompletableFuture<>()),
                     AdsWriteResponse.of(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, invokeId, Result.of(0))
                 ),
                 ImmutablePair.of(
                     new PlcRequestContainer<>(
-                        (InternalPlcRequest) new DefaultPlcReadRequest.Builder(new AdsPlcFieldHandler())
+                        (InternalPlcRequest) new DefaultPlcReadRequest.Builder(null, new AdsPlcFieldHandler()) // TODO: remove null
                             .addItem(RandomStringUtils.randomAscii(10), "1/1:" + pair.adsDataType)
                             .build(), new CompletableFuture<>()),
                     AdsReadResponse.of(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, invokeId, Result.of(0), Data.of(pair.getByteRepresentation()))
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
index 8dcedd8f5..42989d632 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
@@ -22,15 +22,11 @@ Licensed to the Apache Software Foundation (ASF) under one
 import io.netty.channel.ChannelHandler;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcIoException;
 
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
@@ -115,30 +111,6 @@ protected void sendChannelCreatedEvent() {
         // Implemented in sub-classes, if needed.
     }
 
-    @Override
-    public Optional<PlcReader> getReader() {
-        if (this instanceof PlcReader) {
-            return Optional.of((PlcReader) this);
-        }
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<PlcWriter> getWriter() {
-        if (this instanceof PlcWriter) {
-            return Optional.of((PlcWriter) this);
-        }
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<PlcSubscriber> getSubscriber() {
-        if (this instanceof PlcSubscriber) {
-            return Optional.of((PlcSubscriber) this);
-        }
-        return Optional.empty();
-    }
-
     /**
      * Can be used to check and cast a parameter to its required internal type (can be used for general type checking too).
      *
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java
index fa716f0bf..4e738998e 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java
@@ -18,8 +18,17 @@ Licensed to the Apache Software Foundation (ASF) under one
  */
 package org.apache.plc4x.java.base.messages;
 
+import org.apache.plc4x.java.api.messages.PlcResponse;
+
+import java.util.concurrent.CompletableFuture;
+
 public class DefaultPlcProprietaryRequest<REQUEST> implements InternalPlcProprietaryRequest<REQUEST> {
 
+    @Override
+    public CompletableFuture<PlcResponse> execute() {
+        throw new RuntimeException("not supported"); // TODO: figure out what to do with this
+    }
+
     private REQUEST proprietaryRequest;
 
     public DefaultPlcProprietaryRequest(REQUEST proprietaryRequest) {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java
index 679fde12d..69ee2d08e 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java
@@ -18,13 +18,13 @@ Licensed to the Apache Software Foundation (ASF) under one
  */
 package org.apache.plc4x.java.base.messages;
 
-public class DefaultPlcProprietaryResponse<REQUEST, RESPONSE> implements InternalPlcProprietaryResponse<REQUEST, RESPONSE> {
+public class DefaultPlcProprietaryResponse<RESPONSE> implements InternalPlcProprietaryResponse<RESPONSE> {
 
-    private final InternalPlcProprietaryRequest<REQUEST> plcProprietaryRequest;
+    private final InternalPlcProprietaryRequest plcProprietaryRequest;
 
     private final RESPONSE proprietaryResponse;
 
-    public DefaultPlcProprietaryResponse(InternalPlcProprietaryRequest<REQUEST> plcProprietaryRequest, RESPONSE proprietaryResponse) {
+    public DefaultPlcProprietaryResponse(InternalPlcProprietaryRequest plcProprietaryRequest, RESPONSE proprietaryResponse) {
         this.plcProprietaryRequest = plcProprietaryRequest;
         this.proprietaryResponse = proprietaryResponse;
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
index 9b60fe4f7..b08fd8586 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
@@ -21,20 +21,29 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.base.connection.PlcFieldHandler;
 
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPlcFieldRequest {
 
+    private final PlcReader reader;
     private LinkedHashMap<String, PlcField> fields;
 
-    protected DefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
+    protected DefaultPlcReadRequest(PlcReader reader, LinkedHashMap<String, PlcField> fields) {
+        this.reader = reader;
         this.fields = fields;
     }
 
+    @Override
+    public CompletableFuture<PlcReadResponse> execute() {
+        return reader.read(this);
+    }
+
     @Override
     public int getNumberOfFields() {
         return fields.size();
@@ -64,12 +73,18 @@ public PlcField getField(String name) {
             .collect(Collectors.toCollection(LinkedList::new));
     }
 
+    protected PlcReader getReader() {
+        return reader;
+    }
+
     public static class Builder implements PlcReadRequest.Builder {
 
+        private final PlcReader reader;
         private final PlcFieldHandler fieldHandler;
         private final Map<String, String> fields;
 
-        public Builder(PlcFieldHandler fieldHandler) {
+        public Builder(PlcReader reader, PlcFieldHandler fieldHandler) {
+            this.reader = reader;
             this.fieldHandler = fieldHandler;
             fields = new TreeMap<>();
         }
@@ -90,7 +105,7 @@ public PlcReadRequest build() {
                 PlcField parsedField = fieldHandler.createField(fieldQuery);
                 parsedFields.put(name, parsedField);
             });
-            return new DefaultPlcReadRequest(parsedFields);
+            return new DefaultPlcReadRequest(reader, parsedFields);
         }
 
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index 3194cf58b..815b46e04 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcSubscriptionType;
 import org.apache.plc4x.java.base.connection.PlcFieldHandler;
@@ -28,11 +29,23 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import java.time.Duration;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 
 // TODO: request broken needs finishing.
 public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionRequest, InternalPlcFieldRequest {
 
+    private final PlcSubscriber subscriber;
+
+    public DefaultPlcSubscriptionRequest(PlcSubscriber subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    @Override
+    public CompletableFuture<PlcSubscriptionResponse> execute() {
+        return subscriber.subscribe(this);
+    }
+
     @Override
     public int getNumberOfFields() {
         throw new IllegalStateException("not available");
@@ -65,10 +78,12 @@ public PlcSubscriptionType getPlcSubscriptionType() {
 
     public static class Builder implements PlcSubscriptionRequest.Builder {
 
+        private final PlcSubscriber subscriber;
         private final PlcFieldHandler fieldHandler;
         private final Map<String, BuilderItem<Object>> fields;
 
-        public Builder(PlcFieldHandler fieldHandler) {
+        public Builder(PlcSubscriber subscriber, PlcFieldHandler fieldHandler) {
+            this.subscriber = subscriber;
             this.fieldHandler = fieldHandler;
             fields = new TreeMap<>();
         }
@@ -99,7 +114,7 @@ public PlcSubscriptionRequest build() {
                 FieldItem fieldItem = builderItem.encoder.apply(parsedField, null);
                 parsedFields.put(name, new ImmutablePair<>(parsedField, fieldItem));
             });
-            return new DefaultPlcSubscriptionRequest();
+            return new DefaultPlcSubscriptionRequest(subscriber);
         }
 
         private static class BuilderItem<T> {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
index 03e1f3e9d..3e13d575c 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
@@ -20,22 +20,32 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
 
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 // TODO: request broken needs finishing.
 public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptionRequest, InternalPlcFieldRequest {
 
+    private final PlcSubscriber subscriber;
+
     private final Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles;
 
-    public DefaultPlcUnsubscriptionRequest(Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+    public DefaultPlcUnsubscriptionRequest(PlcSubscriber subscriber, Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+        this.subscriber = subscriber;
         this.internalPlcSubscriptionHandles = internalPlcSubscriptionHandles;
     }
 
+    @Override
+    public CompletableFuture<PlcUnsubscriptionResponse> execute() {
+        return subscriber.unsubscribe(this);
+    }
+
     @Override
     public int getNumberOfFields() {
         throw new IllegalStateException("not available");
@@ -68,9 +78,11 @@ public PlcField getField(String name) {
 
     public static class Builder implements PlcUnsubscriptionRequest.Builder {
 
+        private final PlcSubscriber subscriber;
         private List<InternalPlcSubscriptionHandle> plcSubscriptionHandles;
 
-        public Builder() {
+        public Builder(PlcSubscriber subscriber) {
+            this.subscriber = subscriber;
             plcSubscriptionHandles = new ArrayList<>();
         }
 
@@ -94,7 +106,7 @@ public Builder() {
 
         @Override
         public PlcUnsubscriptionRequest build() {
-            return new DefaultPlcUnsubscriptionRequest(plcSubscriptionHandles);
+            return new DefaultPlcUnsubscriptionRequest(subscriber, plcSubscriptionHandles);
         }
 
 
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
index c78ff08a1..633030055 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
@@ -23,6 +23,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.base.connection.PlcFieldHandler;
 import org.apache.plc4x.java.base.messages.items.FieldItem;
@@ -33,17 +34,25 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, InternalPlcFieldRequest {
 
+    private final PlcWriter writer;
     private final LinkedHashMap<String, Pair<PlcField, FieldItem>> fields;
 
-    protected DefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+    protected DefaultPlcWriteRequest(PlcWriter writer, LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+        this.writer = writer;
         this.fields = fields;
     }
 
+    @Override
+    public CompletableFuture<PlcWriteResponse> execute() {
+        return writer.write(this);
+    }
+
     @Override
     public int getNumberOfFields() {
         return fields.size();
@@ -106,11 +115,13 @@ public int getNumberOfValues(String name) {
 
     public static class Builder implements PlcWriteRequest.Builder {
 
+        private final PlcWriter writer;
         private final PlcFieldHandler fieldHandler;
         private final Map<String, BuilderItem<Object>> fields;
         private final Map<Class<?>, BiFunction<PlcField, Object[], FieldItem>> handlerMap;
 
-        public Builder(PlcFieldHandler fieldHandler) {
+        public Builder(PlcWriter writer, PlcFieldHandler fieldHandler) {
+            this.writer = writer;
             this.fieldHandler = fieldHandler;
             fields = new TreeMap<>();
             handlerMap = new HashMap<>();
@@ -236,7 +247,7 @@ public PlcWriteRequest build() {
                 FieldItem fieldItem = builderItem.encoder.apply(parsedField, builderItem.values);
                 parsedFields.put(name, new ImmutablePair<>(parsedField, fieldItem));
             });
-            return new DefaultPlcWriteRequest(parsedFields);
+            return new DefaultPlcWriteRequest(writer, parsedFields);
         }
 
         private Builder addItem(String name, String fieldQuery, Object[] values, BiFunction<PlcField, Object[], FieldItem> encoder) {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java
index 8486388ad..78f632003 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java
@@ -20,7 +20,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import org.apache.plc4x.java.api.messages.PlcFieldResponse;
 
-public interface InternalPlcFieldResponse<REQUEST_TYPE extends InternalPlcFieldRequest> extends PlcFieldResponse<REQUEST_TYPE> {
+public interface InternalPlcFieldResponse extends PlcFieldResponse {
 
 
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryRequest.java
index 3e2ed9f3c..33b5272aa 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryRequest.java
@@ -18,7 +18,5 @@ Licensed to the Apache Software Foundation (ASF) under one
  */
 package org.apache.plc4x.java.base.messages;
 
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
-
 public interface InternalPlcProprietaryRequest<REQUEST> extends PlcProprietaryRequest<REQUEST>, InternalPlcRequest {
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java
index eb5aa3b0a..d42dabfcf 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java
@@ -18,7 +18,5 @@ Licensed to the Apache Software Foundation (ASF) under one
  */
 package org.apache.plc4x.java.base.messages;
 
-import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
-
-public interface InternalPlcProprietaryResponse<REQUEST, RESPONSE> extends PlcProprietaryResponse<InternalPlcProprietaryRequest<REQUEST>, RESPONSE>, InternalPlcResponse<InternalPlcProprietaryRequest<REQUEST>> {
+public interface InternalPlcProprietaryResponse<RESPONSE> extends PlcProprietaryResponse<RESPONSE>, InternalPlcResponse {
 }
\ No newline at end of file
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
index 775a4c7bd..53cf0b5c1 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
@@ -25,7 +25,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import java.util.Map;
 
-public interface InternalPlcReadResponse extends PlcReadResponse<InternalPlcReadRequest>, InternalPlcResponse<InternalPlcReadRequest> {
+public interface InternalPlcReadResponse extends PlcReadResponse, InternalPlcResponse {
 
     Map<String, Pair<PlcResponseCode, FieldItem>> getValues();
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java
index 95c9223f7..dde16f068 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java
@@ -20,5 +20,6 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import org.apache.plc4x.java.api.messages.PlcResponse;
 
-public interface InternalPlcResponse<REQUEST_TYPE extends InternalPlcRequest> extends PlcResponse<REQUEST_TYPE> {
+public interface InternalPlcResponse extends PlcResponse {
+
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
index dbcdc5c0e..06d4ad7c7 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
@@ -23,6 +23,6 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import java.util.Map;
 
-public interface InternalPlcWriteResponse extends PlcWriteResponse<InternalPlcWriteRequest>, InternalPlcResponse<InternalPlcWriteRequest> {
+public interface InternalPlcWriteResponse extends PlcWriteResponse, InternalPlcResponse {
     Map<String, PlcResponseCode> getValues();
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcProprietaryRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProprietaryRequest.java
similarity index 89%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcProprietaryRequest.java
rename to plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProprietaryRequest.java
index a777abfe7..fdc755889 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcProprietaryRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProprietaryRequest.java
@@ -16,7 +16,9 @@ Licensed to the Apache Software Foundation (ASF) under one
  specific language governing permissions and limitations
  under the License.
  */
-package org.apache.plc4x.java.api.messages;
+package org.apache.plc4x.java.base.messages;
+
+import org.apache.plc4x.java.api.messages.PlcRequest;
 
 public interface PlcProprietaryRequest<REQUEST> extends PlcRequest {
     REQUEST getProprietaryRequest();
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcProprietaryResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProprietaryResponse.java
similarity index 80%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcProprietaryResponse.java
rename to plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProprietaryResponse.java
index 2e3bf5838..94af88591 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcProprietaryResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProprietaryResponse.java
@@ -16,8 +16,10 @@ Licensed to the Apache Software Foundation (ASF) under one
  specific language governing permissions and limitations
  under the License.
  */
-package org.apache.plc4x.java.api.messages;
+package org.apache.plc4x.java.base.messages;
 
-public interface PlcProprietaryResponse<REQUEST extends PlcProprietaryRequest, RESPONSE> extends PlcResponse<REQUEST> {
-    RESPONSE getResponse();
+import org.apache.plc4x.java.api.messages.PlcResponse;
+
+public interface PlcProprietaryResponse<T> extends PlcResponse {
+    T getResponse();
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcProprietarySender.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProprietarySender.java
similarity index 70%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcProprietarySender.java
rename to plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProprietarySender.java
index afadad0ad..8a0ed3d7a 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcProprietarySender.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProprietarySender.java
@@ -16,14 +16,10 @@ Licensed to the Apache Software Foundation (ASF) under one
  specific language governing permissions and limitations
  under the License.
  */
-package org.apache.plc4x.java.api.connection;
-
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
-import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
+package org.apache.plc4x.java.base.messages;
 
 import java.util.concurrent.CompletableFuture;
 
 public interface PlcProprietarySender {
-
-    <PROP_REQUEST, PROP_RESPONSE> CompletableFuture<PlcProprietaryResponse<PlcProprietaryRequest<PROP_REQUEST>, PROP_RESPONSE>> send(PlcProprietaryRequest<PROP_REQUEST> proprietaryRequest);
+    <T> CompletableFuture<PlcProprietaryResponse<T>> send(PlcProprietaryRequest proprietaryRequest);
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcProtocolMessage.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProtocolMessage.java
similarity index 94%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcProtocolMessage.java
rename to plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProtocolMessage.java
index 4c428d54e..cacc6b14a 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcProtocolMessage.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProtocolMessage.java
@@ -16,7 +16,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages;
+package org.apache.plc4x.java.base.messages;
 
 public interface PlcProtocolMessage {
 
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcRawMessage.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcRawMessage.java
index 0e0ca3a2e..b6205d9f6 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcRawMessage.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcRawMessage.java
@@ -19,7 +19,6 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.base.messages;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.plc4x.java.api.messages.PlcProtocolMessage;
 
 public class PlcRawMessage implements PlcProtocolMessage {
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcReader.java
similarity index 65%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java
rename to plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcReader.java
index d32640963..c403ca1ce 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcReader.java
@@ -16,7 +16,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.connection;
+package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -35,20 +35,6 @@ Licensed to the Apache Software Foundation (ASF) under one
      * @param readRequest object describing the type and location of the value.
      * @return a {@link CompletableFuture} giving async access to the returned value.
      */
-    CompletableFuture<PlcReadResponse<?>> read(PlcReadRequest readRequest);
-
-    /**
-     * Reads a requested value from a PLC.
-     *
-     * @param readRequestBuilderConsumer consumer which can be used to build requests.
-     * @return a {@link CompletableFuture} giving async access to the returned value.
-     */
-    default CompletableFuture<PlcReadResponse<?>> read(Consumer<PlcReadRequest.Builder> readRequestBuilderConsumer) {
-        PlcReadRequest.Builder requestBuilder = readRequestBuilder();
-        readRequestBuilderConsumer.accept(requestBuilder);
-        return read(requestBuilder.build());
-    }
-
-    PlcReadRequest.Builder readRequestBuilder();
+    CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest);
 
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcRequestContainer.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcRequestContainer.java
index 7e83c7de0..f37867807 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcRequestContainer.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcRequestContainer.java
@@ -18,8 +18,6 @@ Licensed to the Apache Software Foundation (ASF) under one
  */
 package org.apache.plc4x.java.base.messages;
 
-import org.apache.plc4x.java.api.messages.PlcProtocolMessage;
-
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java
new file mode 100644
index 000000000..577a774a8
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java
@@ -0,0 +1,65 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.messages;
+
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+/**
+ * Interface implemented by all PlcConnections that are able to receive notifications from remote resources.
+ */
+public interface PlcSubscriber {
+
+    /**
+     * Subscribes to fields on the PLC.
+     *
+     * @param subscriptionRequest subscription request containing at least one subscription request item.
+     * @return subscription response containing a subscription response item for each subscription request item.
+     */
+    CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest);
+
+    /**
+     * Unsubscribes from fields on the PLC. For unsubscribing the unsubscription request uses the subscription
+     * handle returned as part of the subscription response item.
+     *
+     * @param unsubscriptionRequest unsubscription request containing at least one unsubscription request item.
+     * @return unsubscription response containing a unsubscription response item for each unsubscription request item.
+     */
+    CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest);
+
+    /**
+     * @param consumer
+     * @param handles
+     * @return TODO: document me
+     */
+    PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles);
+
+    /**
+     * // TODO: document me.
+     */
+    void unregister(PlcConsumerRegistration registration);
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcWriter.java
similarity index 64%
rename from plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
rename to plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcWriter.java
index e974939f7..13a83f42b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcWriter.java
@@ -16,7 +16,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.connection;
+package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
@@ -35,20 +35,6 @@ Licensed to the Apache Software Foundation (ASF) under one
      * @param writeRequest object describing the type, location and value that whould be written.
      * @return a {@link CompletableFuture} giving async access to the response of the write operation.
      */
-    CompletableFuture<PlcWriteResponse<?>> write(PlcWriteRequest writeRequest);
-
-    /**
-     * Writes a given value to a PLC.
-     *
-     * @param writeRequestBuilderConsumer consumer which can be used to build requests.
-     * @return a {@link CompletableFuture} giving async access to the response of the write operation.
-     */
-    default CompletableFuture<PlcWriteResponse<?>> write(Consumer<PlcWriteRequest.Builder> writeRequestBuilderConsumer) {
-        PlcWriteRequest.Builder requestBuilder = writeRequestBuilder();
-        writeRequestBuilderConsumer.accept(requestBuilder);
-        return write(requestBuilder.build());
-    }
-
-    PlcWriteRequest.Builder writeRequestBuilder();
+    CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest);
 
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index a9c4a6339..bc19ad2a8 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -25,6 +25,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 import io.netty.util.concurrent.PromiseCombiner;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
+import org.apache.plc4x.java.base.messages.PlcReader;
+import org.apache.plc4x.java.base.messages.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcTimeoutException;
 import org.apache.plc4x.java.api.model.PlcField;
@@ -49,26 +51,29 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     private final Timer timer;
 
+    private final PlcReader reader;
+    private final PlcWriter writer;
+
     // TODO: maybe better get from map
     private long defaultReceiveTimeout;
 
     private PendingWriteQueue queue;
 
-    private ConcurrentMap<PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>, Timeout> scheduledTimeouts;
+    private ConcurrentMap<PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>, Timeout> scheduledTimeouts;
 
     // Map to track send subcontainers
-    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedSubContainer;
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>> sentButUnacknowledgedSubContainer;
 
     // Map to map tdpu to original parent container
     // TODO: currently this could be supplied via param, only reason to keep would be for statistics.
-    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> correlationToParentContainer;
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>> correlationToParentContainer;
 
     // Map to track tdpus per container
     // TODO: currently this could be supplied via param, only reason to keep would be for statistics.
     private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
 
     // Map to track a list of responses per parent container
-    private ConcurrentMap<PlcRequestContainer<?, ?>, Queue<InternalPlcResponse<?>>> responsesToBeDelivered;
+    private ConcurrentMap<PlcRequestContainer<?, ?>, Queue<InternalPlcResponse>> responsesToBeDelivered;
 
     private AtomicInteger correlationIdGenerator;
 
@@ -81,15 +86,17 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     private AtomicLong erroredItems;
 
-    public SingleItemToSingleRequestProtocol(Timer timer) {
-        this(timer, true);
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer) {
+        this(reader, writer, timer, true);
     }
 
-    public SingleItemToSingleRequestProtocol(Timer timer, boolean betterImplementationPossible) {
-        this(timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer, boolean betterImplementationPossible) {
+        this(reader, writer, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
     }
 
-    public SingleItemToSingleRequestProtocol(Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+        this.reader = reader;
+        this.writer = writer;
         this.timer = timer;
         this.defaultReceiveTimeout = defaultReceiveTimeout;
         if (betterImplementationPossible) {
@@ -155,16 +162,16 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
     // Decoding
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-    protected void tryFinish(Integer currentTdpu, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+    protected void tryFinish(Integer currentTdpu, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse> originalResponseFuture) {
         deliveredItems.incrementAndGet();
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
         LOGGER.info("{} got acknowledged", subPlcRequestContainer);
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
         if (originalPlcRequestContainer == null) {
             LOGGER.warn("Unrelated package received {}", msg);
             return;
         }
-        Queue<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new ConcurrentLinkedQueue<>());
+        Queue<InternalPlcResponse> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new ConcurrentLinkedQueue<>());
         correlatedResponseItems.add(msg);
         Set<Integer> integers = containerCorrelationIdMap.get(originalPlcRequestContainer);
         integers.remove(currentTdpu);
@@ -175,7 +182,7 @@ protected void tryFinish(Integer currentTdpu, InternalPlcResponse msg, Completab
                 timeout.cancel();
             }
 
-            InternalPlcResponse<?> plcResponse;
+            InternalPlcResponse plcResponse;
             if (originalPlcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
                 InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) originalPlcRequestContainer.getRequest();
                 HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>();
@@ -206,13 +213,13 @@ protected void tryFinish(Integer currentTdpu, InternalPlcResponse msg, Completab
         }
     }
 
-    protected void errored(Integer currentTdpu, Throwable throwable, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+    protected void errored(Integer currentTdpu, Throwable throwable, CompletableFuture<InternalPlcResponse> originalResponseFuture) {
         erroredItems.incrementAndGet();
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
         LOGGER.info("{} got errored", subPlcRequestContainer);
 
 
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
         if (originalPlcRequestContainer == null) {
             LOGGER.warn("Unrelated error received tdpu:{}", currentTdpu, throwable);
         } else {
@@ -246,7 +253,7 @@ protected void errored(Integer currentTdpu, Throwable throwable, CompletableFutu
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
         if (msg instanceof PlcRequestContainer) {
             @SuppressWarnings("unchecked")
-            PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg;
+            PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>) msg;
             Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> ConcurrentHashMap.newKeySet());
 
             Timeout timeout = timer.newTimeout(timeout_ -> handleTimeout(timeout_, in, tdpus, System.nanoTime()), defaultReceiveTimeout, TimeUnit.MILLISECONDS);
@@ -275,7 +282,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                                     tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
                                 }
                             });
-                        PlcRequestContainer<CorrelatedPlcReadRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture);
+                        PlcRequestContainer<CorrelatedPlcReadRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(reader, field, tdpu), correlatedCompletableFuture);
                         correlationToParentContainer.put(tdpu, in);
                         queue.add(correlatedPlcRequestContainer, subPromise);
                         if (!tdpus.add(tdpu)) {
@@ -298,7 +305,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                                     tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
                                 }
                             });
-                        PlcRequestContainer<CorrelatedPlcWriteRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture);
+                        PlcRequestContainer<CorrelatedPlcWriteRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(writer, fieldItemTriple, tdpu), correlatedCompletableFuture);
                         correlationToParentContainer.put(tdpu, in);
                         queue.add(correlatedPlcRequestContainer, subPromise);
                         if (!tdpus.add(tdpu)) {
@@ -360,7 +367,7 @@ protected synchronized void trySendingMessages(ChannelHandlerContext ctx) {
         ctx.flush();
     }
 
-    private void handleTimeout(Timeout timeout, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in, Set<Integer> tdpus, long scheduledAt) {
+    private void handleTimeout(Timeout timeout, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> in, Set<Integer> tdpus, long scheduledAt) {
         if (timeout.isCancelled()) {
             LOGGER.debug("container {} with timeout {} got canceled", in, timeout);
             return;
@@ -386,15 +393,15 @@ private void handleTimeout(Timeout timeout, PlcRequestContainer<InternalPlcReque
 
         protected final int tdpu;
 
-        protected CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
-            super(fields);
+        protected CorrelatedPlcReadRequest(PlcReader reader, LinkedHashMap<String, PlcField> fields, int tdpu) {
+            super(reader, fields);
             this.tdpu = tdpu;
         }
 
-        protected static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
+        protected static CorrelatedPlcReadRequest of(PlcReader reader, Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
             LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
             fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
-            return new CorrelatedPlcReadRequest(fields, tdpu);
+            return new CorrelatedPlcReadRequest(reader, fields, tdpu);
         }
 
         @Override
@@ -407,15 +414,15 @@ public int getTdpu() {
 
         private final int tdpu;
 
-        public CorrelatedPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
-            super(fields);
+        public CorrelatedPlcWriteRequest(PlcWriter writer, LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
+            super(writer, fields);
             this.tdpu = tdpu;
         }
 
-        public static CorrelatedPlcWriteRequest of(Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
+        public static CorrelatedPlcWriteRequest of(PlcWriter writer, Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
             LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
             fields.put(fieldItemTriple.getLeft(), Pair.of(fieldItemTriple.getMiddle(), fieldItemTriple.getRight()));
-            return new CorrelatedPlcWriteRequest(fields, tdpu);
+            return new CorrelatedPlcWriteRequest(writer, fields, tdpu);
         }
 
         @Override
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/messages/PlcRequestContainerTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/messages/PlcRequestContainerTest.java
index 31ac40a04..019d018de 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/messages/PlcRequestContainerTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/messages/PlcRequestContainerTest.java
@@ -18,7 +18,6 @@ Licensed to the Apache Software Foundation (ASF) under one
  */
 package org.apache.plc4x.java.base.messages;
 
-import org.apache.plc4x.java.api.messages.PlcProtocolMessage;
 import org.apache.plc4x.java.api.messages.PlcRequest;
 import org.hamcrest.core.IsEqual;
 import org.junit.Before;
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index 47d06c565..5a9ac98d1 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -25,6 +25,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 import io.netty.util.HashedWheelTimer;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.base.messages.PlcReader;
+import org.apache.plc4x.java.base.messages.PlcSubscriber;
+import org.apache.plc4x.java.base.messages.PlcWriter;
 import org.apache.plc4x.java.api.messages.PlcFieldRequest;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
@@ -52,8 +55,18 @@ Licensed to the Apache Software Foundation (ASF) under one
 @ExtendWith(MockitoExtension.class)
 class SingleItemToSingleRequestProtocolTest implements WithAssertions {
 
+    PlcReader mockReader = null;
+    PlcWriter mockWriter = null;
+    PlcSubscriber mockSubscriber = null;
+
     @InjectMocks
-    SingleItemToSingleRequestProtocol SUT = new SingleItemToSingleRequestProtocol(new HashedWheelTimer(), TimeUnit.SECONDS.toMillis(1), false);
+    SingleItemToSingleRequestProtocol SUT = new SingleItemToSingleRequestProtocol(
+        mockReader,
+        mockWriter,
+        new HashedWheelTimer(),
+        TimeUnit.SECONDS.toMillis(1),
+        false
+    );
 
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     ChannelHandlerContext channelHandlerContext;
@@ -138,7 +151,7 @@ void channelInactive() throws Exception {
         void simpleRead() throws Exception {
             // Given
             // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
             // When
             // we write this
             SUT.write(channelHandlerContext, msg, channelPromise);
@@ -169,7 +182,7 @@ void simpleRead() throws Exception {
         void partialRead() throws Exception {
             // Given
             // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
             // When
             // we write this
             SUT.write(channelHandlerContext, msg, channelPromise);
@@ -202,7 +215,7 @@ void partialRead() throws Exception {
         void partialReadOneErrored() throws Exception {
             // Given
             // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
             // When
             // we write this
             SUT.write(channelHandlerContext, msg, channelPromise);
@@ -241,7 +254,7 @@ void partialReadOneErrored() throws Exception {
         void noRead() throws Exception {
             // Given
             // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
             // When
             // we write this
             SUT.write(channelHandlerContext, msg, channelPromise);
@@ -314,7 +327,7 @@ void empty() throws Exception {
         @Test
         void read() throws Exception {
             // Given
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
             // When
             SUT.write(channelHandlerContext, msg, channelPromise);
             // Then
@@ -345,7 +358,7 @@ void read() throws Exception {
         @Test
         void write() throws Exception {
             // Given
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture);
             // When
             SUT.write(channelHandlerContext, msg, channelPromise);
             // Then
@@ -395,48 +408,51 @@ void trySendingMessages() throws Exception {
     }
 
     private static class TestDefaultPlcReadRequest extends DefaultPlcReadRequest {
-
-        private TestDefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
-            super(fields);
+        private TestDefaultPlcReadRequest(PlcReader reader, LinkedHashMap<String, PlcField> fields) {
+            super(reader, fields);
         }
 
-        private static TestDefaultPlcReadRequest build() {
+        private static TestDefaultPlcReadRequest build(PlcReader reader) {
             LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
             IntStream.rangeClosed(1, 5).forEach(i -> fields.put("readField" + i, mock(PlcField.class)));
-            return new TestDefaultPlcReadRequest(fields);
+            return new TestDefaultPlcReadRequest(reader, fields);
         }
     }
 
     private static class TestDefaultPlcWriteRequest extends DefaultPlcWriteRequest {
 
-        private TestDefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
-            super(fields);
+        private TestDefaultPlcWriteRequest(PlcWriter writer, LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+            super(writer, fields);
         }
 
-        private static TestDefaultPlcWriteRequest build() {
+        private static TestDefaultPlcWriteRequest build(PlcWriter writer) {
             LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
             IntStream.rangeClosed(1, 5).forEach(i -> fields.put("writeField" + i, Pair.of(mock(PlcField.class), mock(FieldItem.class))));
-            return new TestDefaultPlcWriteRequest(fields);
+            return new TestDefaultPlcWriteRequest(writer, fields);
         }
     }
 
     private static class TestDefaultPlcSubscriptionRequest extends DefaultPlcSubscriptionRequest {
 
-        private static TestDefaultPlcSubscriptionRequest build() {
+        private TestDefaultPlcSubscriptionRequest(PlcSubscriber subscriber) {
+            super(subscriber);
+        }
+
+        private static TestDefaultPlcSubscriptionRequest build(PlcSubscriber subscriber) {
             // TODO: implement me once available
-            return new TestDefaultPlcSubscriptionRequest();
+            return new TestDefaultPlcSubscriptionRequest(subscriber);
         }
     }
 
     private static class TestDefaultPlcUnsubscriptionRequest extends DefaultPlcUnsubscriptionRequest {
 
-        private TestDefaultPlcUnsubscriptionRequest(Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
-            super(internalPlcSubscriptionHandles);
+        private TestDefaultPlcUnsubscriptionRequest(PlcSubscriber subscriber, Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+            super(subscriber, internalPlcSubscriptionHandles);
         }
 
-        private static TestDefaultPlcUnsubscriptionRequest build() {
+        private static TestDefaultPlcUnsubscriptionRequest build(PlcSubscriber subscriber) {
             // TODO: implement me once available
-            return new TestDefaultPlcUnsubscriptionRequest(Collections.emptyList());
+            return new TestDefaultPlcUnsubscriptionRequest(subscriber, Collections.emptyList());
         }
     }
 }
\ No newline at end of file
diff --git a/plc4j/protocols/driver-bases/test/src/main/java/org/apache/plc4x/java/base/connection/MockConnection.java b/plc4j/protocols/driver-bases/test/src/main/java/org/apache/plc4x/java/base/connection/MockConnection.java
index 3c64768d7..21d19d279 100644
--- a/plc4j/protocols/driver-bases/test/src/main/java/org/apache/plc4x/java/base/connection/MockConnection.java
+++ b/plc4j/protocols/driver-bases/test/src/main/java/org/apache/plc4x/java/base/connection/MockConnection.java
@@ -19,8 +19,12 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.base.connection;
 
 import io.netty.channel.ChannelHandler;
-import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class MockConnection extends AbstractPlcConnection {
@@ -33,6 +37,26 @@ public MockConnection(boolean awaitSessionSetupComplete) {
         super(new TestChannelFactory(), awaitSessionSetupComplete);
     }
 
+    @Override
+    public Optional<PlcReadRequest.Builder> readRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcWriteRequest.Builder> writeRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
     @Override
     protected ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture) {
         return null;
diff --git a/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/EtherNetIpPlcDriver.java b/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/EtherNetIpPlcDriver.java
index 69aa03abb..47a487b83 100644
--- a/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/EtherNetIpPlcDriver.java
+++ b/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/EtherNetIpPlcDriver.java
@@ -18,9 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.ethernetip;
 
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.ethernetip.connection.EtherNetIpTcpPlcConnection;
 
diff --git a/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/connection/BaseEtherNetIpPlcConnection.java b/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/connection/BaseEtherNetIpPlcConnection.java
index 4aa7a4702..80dd05426 100644
--- a/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/connection/BaseEtherNetIpPlcConnection.java
+++ b/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/connection/BaseEtherNetIpPlcConnection.java
@@ -19,9 +19,12 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.ethernetip.connection;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
-import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.base.messages.PlcReader;
+import org.apache.plc4x.java.base.messages.PlcWriter;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
 import org.apache.plc4x.java.base.messages.*;
@@ -29,6 +32,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public abstract class BaseEtherNetIpPlcConnection extends AbstractPlcConnection implements PlcReader, PlcWriter {
@@ -56,12 +60,17 @@ Licensed to the Apache Software Foundation (ASF) under one
     }
 
     @Override
-    public PlcReadRequest.Builder readRequestBuilder() {
-        return new DefaultPlcReadRequest.Builder(new EnipPlcFieldHandler());
+    public Optional<PlcReadRequest.Builder> readRequestBuilder() {
+        return Optional.of(new DefaultPlcReadRequest.Builder(this, new EnipPlcFieldHandler()));
     }
 
     @Override
-    public CompletableFuture<PlcReadResponse<?>> read(PlcReadRequest readRequest) {
+    public Optional<PlcWriteRequest.Builder> writeRequestBuilder() {
+        return Optional.of(new DefaultPlcWriteRequest.Builder(this, new EnipPlcFieldHandler()));
+    }
+
+    @Override
+    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         CompletableFuture<InternalPlcReadResponse> future = new CompletableFuture<>();
         PlcRequestContainer<InternalPlcReadRequest, InternalPlcReadResponse> container =
             new PlcRequestContainer<>((InternalPlcReadRequest) readRequest, future);
@@ -75,12 +84,7 @@ Licensed to the Apache Software Foundation (ASF) under one
     }
 
     @Override
-    public PlcWriteRequest.Builder writeRequestBuilder() {
-        return new DefaultPlcWriteRequest.Builder(new EnipPlcFieldHandler());
-    }
-
-    @Override
-    public CompletableFuture<PlcWriteResponse<?>> write(PlcWriteRequest writeRequest) {
+    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         CompletableFuture<InternalPlcWriteResponse> future = new CompletableFuture<>();
         PlcRequestContainer<InternalPlcWriteRequest, InternalPlcWriteResponse> container =
             new PlcRequestContainer<>((InternalPlcWriteRequest) writeRequest, future);
diff --git a/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/connection/EtherNetIpTcpPlcConnection.java b/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/connection/EtherNetIpTcpPlcConnection.java
index e14ac1934..3f0ed79dd 100644
--- a/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/connection/EtherNetIpTcpPlcConnection.java
+++ b/plc4j/protocols/ethernetip/src/main/java/org/apache/plc4x/java/ethernetip/connection/EtherNetIpTcpPlcConnection.java
@@ -19,6 +19,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.ethernetip.connection;
 
 import io.netty.channel.*;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
 import org.apache.plc4x.java.base.events.ConnectEvent;
@@ -29,6 +31,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class EtherNetIpTcpPlcConnection extends BaseEtherNetIpPlcConnection {
@@ -52,6 +55,16 @@ public EtherNetIpTcpPlcConnection(ChannelFactory channelFactory, String params)
         super(channelFactory, params);
     }
 
+    @Override
+    public Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
     @Override
     protected ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture) {
         return new ChannelInitializer() {
diff --git a/plc4j/protocols/ethernetip/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/plc4j/protocols/ethernetip/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
similarity index 100%
rename from plc4j/protocols/ethernetip/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
rename to plc4j/protocols/ethernetip/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
diff --git a/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java b/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java
index 8750d5ed5..27d7f39c0 100644
--- a/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java
+++ b/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java
@@ -19,8 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.ethernetip;
 
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 
@@ -34,15 +33,12 @@ public static void main(String... args) {
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
             System.out.println("PlcConnection " + plcConnection);
 
-            // Get a reader instance.
-            PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
-            PlcReadRequest readRequest = reader.readRequestBuilder()
+            PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("Reading not supported"))
                 .addItem("field", "#4#105#3").build();
 
             // Execute the read operation.
-            CompletableFuture<PlcReadResponse<?>> response = reader.read(readRequest);
-            PlcReadResponse<?> readResponse = response.get();
+            CompletableFuture<? extends PlcReadResponse> response = readRequest.execute();
+            PlcReadResponse readResponse = response.get();
 
             // Output the response.
             for (String fieldName : readResponse.getFieldNames()) {
diff --git a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/ModbusPlcDriver.java b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/ModbusPlcDriver.java
index 0aad72ef6..30d9091e3 100644
--- a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/ModbusPlcDriver.java
+++ b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/ModbusPlcDriver.java
@@ -19,9 +19,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.modbus;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.modbus.connection.ModbusConnectionFactory;
 
diff --git a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnection.java b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnection.java
index a9db2af67..2f983e756 100644
--- a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnection.java
+++ b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnection.java
@@ -19,12 +19,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.modbus.connection;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.base.messages.PlcReader;
+import org.apache.plc4x.java.base.messages.PlcWriter;
+import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
 import org.apache.plc4x.java.base.messages.*;
@@ -32,6 +29,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public abstract class BaseModbusPlcConnection extends AbstractPlcConnection implements PlcReader, PlcWriter {
@@ -59,12 +57,27 @@ Licensed to the Apache Software Foundation (ASF) under one
     }
 
     @Override
-    public PlcReadRequest.Builder readRequestBuilder() {
-        return new DefaultPlcReadRequest.Builder(new ModbusPlcFieldHandler());
+    public Optional<PlcReadRequest.Builder> readRequestBuilder() {
+        return Optional.of(new DefaultPlcReadRequest.Builder(this, new ModbusPlcFieldHandler()));
     }
 
     @Override
-    public CompletableFuture<PlcReadResponse<?>> read(PlcReadRequest readRequest) {
+    public Optional<PlcWriteRequest.Builder> writeRequestBuilder() {
+        return Optional.of(new DefaultPlcWriteRequest.Builder(this, new ModbusPlcFieldHandler()));
+    }
+
+    @Override
+    public Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         CompletableFuture<InternalPlcReadResponse> future = new CompletableFuture<>();
         PlcRequestContainer<InternalPlcReadRequest, InternalPlcReadResponse> container =
             new PlcRequestContainer<>((InternalPlcReadRequest) readRequest, future);
@@ -78,12 +91,7 @@ Licensed to the Apache Software Foundation (ASF) under one
     }
 
     @Override
-    public PlcWriteRequest.Builder writeRequestBuilder() {
-        return new DefaultPlcWriteRequest.Builder(new ModbusPlcFieldHandler());
-    }
-
-    @Override
-    public CompletableFuture<PlcWriteResponse<?>> write(PlcWriteRequest writeRequest) {
+    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         CompletableFuture<InternalPlcWriteResponse> future = new CompletableFuture<>();
         PlcRequestContainer<InternalPlcWriteRequest, InternalPlcWriteResponse> container =
             new PlcRequestContainer<>((InternalPlcWriteRequest) writeRequest, future);
diff --git a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java
index 1bcdb6747..8df399ea9 100644
--- a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java
+++ b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java
@@ -24,6 +24,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
 import org.apache.plc4x.java.base.protocol.SingleItemToSingleRequestProtocol;
@@ -32,6 +34,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class ModbusTcpPlcConnection extends BaseModbusPlcConnection {
@@ -69,7 +72,7 @@ protected ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupC
             protected void initChannel(Channel channel) {
                 channel.pipeline().addLast(new ModbusTcpCodec(new ModbusRequestEncoder(), new ModbusResponseDecoder()));
                 channel.pipeline().addLast(new Plc4XModbusProtocol());
-                channel.pipeline().addLast(new SingleItemToSingleRequestProtocol(timer));
+                channel.pipeline().addLast(new SingleItemToSingleRequestProtocol(ModbusTcpPlcConnection.this, ModbusTcpPlcConnection.this, timer));
             }
         };
     }
diff --git a/plc4j/protocols/modbus/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/plc4j/protocols/modbus/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
similarity index 100%
rename from plc4j/protocols/modbus/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
rename to plc4j/protocols/modbus/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
diff --git a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java
index bfa1f2c5e..ded2ea3f6 100644
--- a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java
+++ b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java
@@ -20,10 +20,10 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.base.util.HexUtil;
 
@@ -48,9 +48,9 @@ public static void main(String... args) {
             System.out.println("PlcConnection " + plcConnection);
 
             {
-                PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
-                PlcReadResponse<?> readResponse = reader.read(builder -> builder.addItem("randomRegister", "register:7[3]")).get();
+                PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
+                .addItem("randomRegister", "register:7[3]").build();
+                PlcReadResponse readResponse = readRequest.execute().get();
                 System.out.println("Response " + readResponse);
                 readResponse.getAllByteArrays("randomRegister").stream()
                     .map(HexUtil::toHex)
@@ -60,10 +60,11 @@ public static void main(String... args) {
 
             {
                 // Read an int from 2 registers
-                PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
 
                 // Just dump the actual values
-                PlcReadResponse<?> readResponse = reader.read(builder -> builder.addItem("randomRegister", "register:3[2]")).get();
+                PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
+                    .addItem("randomRegister", "register:3[2]").build();
+                PlcReadResponse readResponse = readRequest.execute().get();
                 System.out.println("Response " + readResponse);
                 Collection<Byte[]> randomRegisters = readResponse.getAllByteArrays("randomRegister");
                 randomRegisters.stream()
@@ -83,16 +84,16 @@ public static void main(String... args) {
 
             {
                 // Read an int from 2 registers and multiple requests
-                PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
 
                 // Just dump the actual values
-                PlcReadResponse<?> readResponse = reader.read(builder -> builder
+                PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
                     .addItem("randomRegister1", "register:1[2]")
                     .addItem("randomRegister2", "register:10[3]")
                     .addItem("randomRegister3", "register:20[4]")
                     .addItem("randomRegister4", "register:30[5]")
                     .addItem("randomRegister5", "register:40[6]")
-                ).get();
+                    .build();
+                PlcReadResponse readResponse = readRequest.execute().get();
                 System.out.println("Response " + readResponse);
                 IntStream.range(1, 6).forEach(i -> {
                     Collection<Byte[]> randomRegisters = readResponse.getAllByteArrays("randomRegister" + i);
@@ -113,9 +114,9 @@ public static void main(String... args) {
             }
 
             {
-                PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
-                PlcReadResponse<?> readResponse = reader.read(builder -> builder.addItem("randomCoil", "coil:1[9]")).get();
+                PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
+                    .addItem("randomCoil", "coil:1[9]").build();
+                PlcReadResponse readResponse = readRequest.execute().get();
                 System.out.println("Response " + readResponse);
                 readResponse.getAllBooleans("randomCoil").stream()
                     .map(hex -> "Coil Value: " + hex)
@@ -123,9 +124,9 @@ public static void main(String... args) {
             }
 
             {
-                PlcWriter writer = plcConnection.getWriter().orElseThrow(() -> new RuntimeException("No Writer found"));
-
-                PlcWriteResponse<?> writeResponse = writer.write(builder -> builder.addItem("randomCoilField", "coil:1", true)).get();
+                PlcWriteRequest writeRequest = plcConnection.writeRequestBuilder().orElseThrow(() -> new RuntimeException("No Writer found"))
+                    .addItem("randomCoilField", "coil:1", true).build();
+                PlcWriteResponse writeResponse = writeRequest.execute().get();
                 System.out.println("Response " + writeResponse);
             }
         } catch (Exception e) {
diff --git a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnectionTest.java b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnectionTest.java
index 62f94cb9a..8a4c41993 100644
--- a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnectionTest.java
+++ b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/connection/BaseModbusPlcConnectionTest.java
@@ -90,7 +90,7 @@ protected ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupC
 
     @Test
     public void read() {
-        CompletableFuture<PlcReadResponse<?>> read = SUT.read(mock(InternalPlcReadRequest.class));
+        CompletableFuture<PlcReadResponse> read = SUT.read(mock(InternalPlcReadRequest.class));
         assertNotNull(read);
 
         simulatePipelineError(() -> SUT.read(mock(InternalPlcReadRequest.class)));
@@ -98,7 +98,7 @@ public void read() {
 
     @Test
     public void write() {
-        CompletableFuture<PlcWriteResponse<?>> write = SUT.write(mock(InternalPlcWriteRequest.class));
+        CompletableFuture<PlcWriteResponse> write = SUT.write(mock(InternalPlcWriteRequest.class));
         assertNotNull(write);
 
         simulatePipelineError(() -> SUT.write(mock(InternalPlcWriteRequest.class)));
diff --git a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/connection/ModbusSerialPlcConnectionTest.java b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/connection/ModbusSerialPlcConnectionTest.java
index b0d1478b8..1179ee0ad 100644
--- a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/connection/ModbusSerialPlcConnectionTest.java
+++ b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/connection/ModbusSerialPlcConnectionTest.java
@@ -24,6 +24,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import io.netty.channel.jsc.JSerialCommDeviceAddress;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.reflect.MethodUtils;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.SerialChannelFactory;
@@ -65,7 +66,8 @@ public void tearDown() {
     @Test
     public void testRead() throws Exception {
         prepareSerialSimulator();
-        CompletableFuture<PlcReadResponse<?>> read = SUT.read(builder -> builder.addItem("randomRead", "0/0"));
+        PlcReadRequest readRequest = SUT.readRequestBuilder().get().addItem("randomRead", "0/0").build();
+        CompletableFuture<PlcReadResponse> read = SUT.read(readRequest);
         PlcReadResponse plcReadResponse = read.get(30, TimeUnit.SECONDS);
         assertNotNull(plcReadResponse);
     }
diff --git a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocolTest.java b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocolTest.java
index b67f97af8..3a40f125c 100644
--- a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocolTest.java
+++ b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocolTest.java
@@ -124,7 +124,7 @@ Licensed to the Apache Software Foundation (ASF) under one
     private static ImmutablePair<PlcRequestContainer<InternalPlcReadRequest, InternalPlcResponse>, ModbusTcpPayload> producePair(Class type, String field, ModbusPdu modbusPdu) {
         return ImmutablePair.of(
             new PlcRequestContainer<>(
-                (InternalPlcReadRequest) new DefaultPlcReadRequest.Builder(new ModbusPlcFieldHandler())
+                (InternalPlcReadRequest) new DefaultPlcReadRequest.Builder(null, new ModbusPlcFieldHandler()) // TODO: remove null
                     .addItem(RandomStringUtils.randomAlphabetic(10), field)
                     .build(), new CompletableFuture<>()),
             new ModbusTcpPayload((short) 0, (short) 0, modbusPdu)
@@ -139,7 +139,7 @@ Licensed to the Apache Software Foundation (ASF) under one
         if (values.length == 1) {
             return ImmutablePair.of(
                 new PlcRequestContainer<>(
-                    (InternalPlcWriteRequest) new DefaultPlcWriteRequest.Builder(new ModbusPlcFieldHandler())
+                    (InternalPlcWriteRequest) new DefaultPlcWriteRequest.Builder(null, new ModbusPlcFieldHandler()) // TODO: remove null
                         .addItem(RandomStringUtils.randomAlphabetic(10), field, values[0])
                         .build(), new CompletableFuture<>()),
                 new ModbusTcpPayload((short) 0, (short) 0, modbusPdu)
@@ -147,7 +147,7 @@ Licensed to the Apache Software Foundation (ASF) under one
         } else {
             return ImmutablePair.of(
                 new PlcRequestContainer<>(
-                    (InternalPlcWriteRequest) new DefaultPlcWriteRequest.Builder(new ModbusPlcFieldHandler())
+                    (InternalPlcWriteRequest) new DefaultPlcWriteRequest.Builder(null, new ModbusPlcFieldHandler()) // TODO: remove null
                         .addItem(RandomStringUtils.randomAlphabetic(10), field, values)
                         .build(), new CompletableFuture<>()),
                 new ModbusTcpPayload((short) 0, (short) 0, modbusPdu)
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/model/tpdus/DataTpdu.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/model/tpdus/DataTpdu.java
index 49dab91fb..1134954d5 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/model/tpdus/DataTpdu.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/model/tpdus/DataTpdu.java
@@ -19,7 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.isotp.netty.model.tpdus;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.plc4x.java.api.messages.PlcProtocolMessage;
+import org.apache.plc4x.java.base.messages.PlcProtocolMessage;
 import org.apache.plc4x.java.isotp.netty.model.params.Parameter;
 import org.apache.plc4x.java.isotp.netty.model.types.TpduCode;
 
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/model/tpdus/Tpdu.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/model/tpdus/Tpdu.java
index af5b11607..69e854546 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/model/tpdus/Tpdu.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/model/tpdus/Tpdu.java
@@ -19,7 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.java.isotp.netty.model.tpdus;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.plc4x.java.api.messages.PlcProtocolMessage;
+import org.apache.plc4x.java.base.messages.PlcProtocolMessage;
 import org.apache.plc4x.java.base.messages.PlcRawMessage;
 import org.apache.plc4x.java.isotp.netty.model.params.Parameter;
 import org.apache.plc4x.java.isotp.netty.model.types.TpduCode;
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/S7PlcDriver.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/S7PlcDriver.java
index 1d6b576f5..839ed3036 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/S7PlcDriver.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/S7PlcDriver.java
@@ -18,9 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.s7;
 
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.s7.connection.S7PlcConnection;
 
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
index 731819824..037beea36 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
@@ -22,13 +22,10 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.SystemConfiguration;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.base.messages.PlcReader;
+import org.apache.plc4x.java.base.messages.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
@@ -51,6 +48,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import java.net.InetAddress;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -237,12 +235,27 @@ public void close() throws PlcConnectionException {
     }
 
     @Override
-    public PlcReadRequest.Builder readRequestBuilder() {
-        return new DefaultPlcReadRequest.Builder(new S7PlcFieldHandler());
+    public Optional<PlcReadRequest.Builder> readRequestBuilder() {
+        return Optional.of(new DefaultPlcReadRequest.Builder(this, new S7PlcFieldHandler()));
     }
 
     @Override
-    public CompletableFuture<PlcReadResponse<?>> read(PlcReadRequest readRequest) {
+    public Optional<PlcWriteRequest.Builder> writeRequestBuilder() {
+        return Optional.of(new DefaultPlcWriteRequest.Builder(this, new S7PlcFieldHandler()));
+    }
+
+    @Override
+    public Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder() {
+        return Optional.empty();
+    }
+
+    @Override
+    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         CompletableFuture<InternalPlcReadResponse> future = new CompletableFuture<>();
         PlcRequestContainer<InternalPlcReadRequest, InternalPlcReadResponse> container =
             new PlcRequestContainer<>((InternalPlcReadRequest) readRequest, future);
@@ -256,12 +269,7 @@ public void close() throws PlcConnectionException {
     }
 
     @Override
-    public PlcWriteRequest.Builder writeRequestBuilder() {
-        return new DefaultPlcWriteRequest.Builder(new S7PlcFieldHandler());
-    }
-
-    @Override
-    public CompletableFuture<PlcWriteResponse<?>> write(PlcWriteRequest writeRequest) {
+    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         CompletableFuture<InternalPlcWriteResponse> future = new CompletableFuture<>();
         PlcRequestContainer<InternalPlcWriteRequest, InternalPlcWriteResponse> container =
             new PlcRequestContainer<>((InternalPlcWriteRequest) writeRequest, future);
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/S7Message.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/S7Message.java
index 75a7df9f2..c554e44a2 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/S7Message.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/S7Message.java
@@ -18,7 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.s7.netty.model.messages;
 
-import org.apache.plc4x.java.api.messages.PlcProtocolMessage;
+import org.apache.plc4x.java.base.messages.PlcProtocolMessage;
 import org.apache.plc4x.java.base.messages.PlcRawMessage;
 import org.apache.plc4x.java.s7.netty.model.params.S7Parameter;
 import org.apache.plc4x.java.s7.netty.model.payloads.S7Payload;
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/S7RequestMessage.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/S7RequestMessage.java
index 5d03568ec..1fcd0efd7 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/S7RequestMessage.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/S7RequestMessage.java
@@ -18,7 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.s7.netty.model.messages;
 
-import org.apache.plc4x.java.api.messages.PlcProtocolMessage;
+import org.apache.plc4x.java.base.messages.PlcProtocolMessage;
 import org.apache.plc4x.java.s7.netty.model.params.S7Parameter;
 import org.apache.plc4x.java.s7.netty.model.payloads.S7Payload;
 import org.apache.plc4x.java.s7.netty.model.types.MessageType;
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/SetupCommunicationRequestMessage.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/SetupCommunicationRequestMessage.java
index 244ac8783..8194fa8b0 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/SetupCommunicationRequestMessage.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/messages/SetupCommunicationRequestMessage.java
@@ -18,7 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.s7.netty.model.messages;
 
-import org.apache.plc4x.java.api.messages.PlcProtocolMessage;
+import org.apache.plc4x.java.base.messages.PlcProtocolMessage;
 import org.apache.plc4x.java.s7.netty.model.params.SetupCommunicationParameter;
 import org.apache.plc4x.java.s7.netty.model.types.MessageType;
 
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java
index 570195335..118143c37 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java
@@ -21,7 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
-import org.apache.plc4x.java.api.messages.PlcProtocolMessage;
+import org.apache.plc4x.java.base.messages.PlcProtocolMessage;
 import org.apache.plc4x.java.s7.netty.model.messages.S7RequestMessage;
 import org.apache.plc4x.java.s7.netty.model.messages.S7ResponseMessage;
 import org.apache.plc4x.java.s7.netty.model.params.S7Parameter;
diff --git a/plc4j/protocols/s7/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/plc4j/protocols/s7/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
similarity index 100%
rename from plc4j/protocols/s7/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
rename to plc4j/protocols/s7/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcScanner.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcScanner.java
index bb7e369c7..c7ddebd9d 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcScanner.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcScanner.java
@@ -18,17 +18,10 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.s7;
 
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.model.PlcField;
-import org.apache.plc4x.java.s7.netty.model.types.MemoryArea;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Optional;
-
 public class S7PlcScanner {
 
     private static final Logger logger = LoggerFactory.getLogger(S7PlcScanner.class);
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcTestConsole.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcTestConsole.java
index 06648b701..f33f563f6 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcTestConsole.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcTestConsole.java
@@ -18,14 +18,6 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.s7;
 
-import java.util.List;
-import java.util.Optional;
-import java.util.Scanner;
-
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.model.PlcField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcWriterSample.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcWriterSample.java
index 67a0d53be..48318eddb 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcWriterSample.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcWriterSample.java
@@ -18,15 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.s7;
 
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
-import org.apache.plc4x.java.api.model.PlcField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Optional;
-
 public class S7PlcWriterSample {
 
     private static final Logger logger = LoggerFactory.getLogger(S7PlcWriterSample.class);
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/issues/PLC4X47Test.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/issues/PLC4X47Test.java
index d1c9e8dbe..100a677b4 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/issues/PLC4X47Test.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/issues/PLC4X47Test.java
@@ -38,13 +38,13 @@ public void testLargeRequest() throws Exception {
         EmbeddedChannel channel = channelFactory.getChannel();*/
         S7PlcConnection connection = (S7PlcConnection) new PlcDriverManager().getConnection("s7://10.10.64.20/1/1");
 
-        PlcReadRequest.Builder builder = connection.readRequestBuilder();
+        PlcReadRequest.Builder builder = connection.readRequestBuilder().get();
         for (int i = 1; i <= 30; i++) {
             // just the first byte of each db
             builder.addItem("field-" + i, "%DB3.DB" + i + ":SINT");
         }
         PlcReadRequest readRequest = builder.build();
-        PlcReadResponse<?> readResponse = connection.read(readRequest).get();
+        PlcReadResponse readResponse = connection.read(readRequest).get();
         System.out.println(readResponse.getFieldNames().size());
     }
 
diff --git a/plc4j/protocols/src/site/asciidoc/developers/implementing-drivers.adoc b/plc4j/protocols/src/site/asciidoc/developers/implementing-drivers.adoc
index 99af2a1db..5c62a0321 100644
--- a/plc4j/protocols/src/site/asciidoc/developers/implementing-drivers.adoc
+++ b/plc4j/protocols/src/site/asciidoc/developers/implementing-drivers.adoc
@@ -171,14 +171,14 @@ It allows to manually send and receive (binary) data to and from a pipeline for
 == Implementing a custom driver
 
 PLC4X's `DriverManager` finds it's drivers, by using the default `Java ServiceLoader`.
-This requires a file called `org.apache.plc4x.java.api.PlcDriver` in the `META-INF/services` directory of the drivers JAR file.
+This requires a file called `org.apache.plc4x.java.spi.PlcDriver` in the `META-INF/services` directory of the drivers JAR file.
 For each type of driver provided inside this JAR, one line has to be added to this file containing the fully qualified class name of the driver implementation.
 
 For the S7 driver for example all it contains is this line:
 
     org.apache.plc4x.java.s7.S7PlcDriver
 
-A driver implementation must implement the `org.apache.plc4x.java.api.PlcDriver` interface.
+A driver implementation must implement the `org.apache.plc4x.java.spi.PlcDriver` interface.
 This defines the necessary methods for the `DriverManager` to find the correct implementation and create a new connection instance.
 
 The important methods here are:
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
index 911e05a62..32da5e961 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
@@ -20,13 +20,17 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.plc4x.java.api.connection.*;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.base.messages.PlcReader;
+import org.apache.plc4x.java.base.messages.PlcWriter;
 import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.messages.*;
 import org.apache.plc4x.java.base.messages.items.FieldItem;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -57,27 +61,27 @@ public void close() {
     }
 
     @Override
-    public Optional<PlcReader> getReader() {
-        return Optional.of(this);
+    public Optional<PlcReadRequest.Builder> readRequestBuilder() {
+        return Optional.of(new DefaultPlcReadRequest.Builder(this, new TestFieldHandler()));
     }
 
     @Override
-    public Optional<PlcWriter> getWriter() {
-        return Optional.of(this);
+    public Optional<PlcWriteRequest.Builder> writeRequestBuilder() {
+        return Optional.of(new DefaultPlcWriteRequest.Builder(this, new TestFieldHandler()));
     }
 
     @Override
-    public Optional<PlcSubscriber> getSubscriber() {
-        return Optional.empty(); // TODO: implement this
+    public Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder() {
+        return Optional.empty();
     }
 
     @Override
-    public PlcReadRequest.Builder readRequestBuilder() {
-        return new DefaultPlcReadRequest.Builder(new TestFieldHandler());
+    public Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder() {
+        return Optional.empty();
     }
 
     @Override
-    public CompletableFuture<PlcReadResponse<?>> read(PlcReadRequest readRequest) {
+    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         if(!(readRequest instanceof InternalPlcReadRequest)) {
             throw new IllegalArgumentException("Read request doesn't implement InternalPlcReadRequest");
         }
@@ -93,17 +97,12 @@ public void close() {
                 : new ImmutablePair<>(PlcResponseCode.NOT_FOUND, null);
             fields.put(fieldName, fieldPair);
         }
-        PlcReadResponse<?> response = new DefaultPlcReadResponse(request, fields);
+        PlcReadResponse response = new DefaultPlcReadResponse(request, fields);
         return CompletableFuture.completedFuture(response);
     }
 
     @Override
-    public PlcWriteRequest.Builder writeRequestBuilder() {
-        return new DefaultPlcWriteRequest.Builder(new TestFieldHandler());
-    }
-
-    @Override
-    public CompletableFuture<PlcWriteResponse<?>> write(PlcWriteRequest writeRequest) {
+    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         if(!(writeRequest instanceof InternalPlcWriteRequest)) {
             throw new IllegalArgumentException("Read request doesn't implement InternalPlcWriteRequest");
         }
@@ -115,7 +114,7 @@ public void close() {
             device.set(field, fieldItem);
             fields.put(fieldName, PlcResponseCode.OK);
         }
-        PlcWriteResponse<?> response = new DefaultPlcWriteResponse(request, fields);
+        PlcWriteResponse response = new DefaultPlcWriteResponse(request, fields);
         return CompletableFuture.completedFuture(response);
     }
 
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestPlcDriver.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestPlcDriver.java
index d4f2138b4..d5cec02dd 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestPlcDriver.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestPlcDriver.java
@@ -18,9 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.test;
 
-import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
-import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 
 /**
diff --git a/plc4j/protocols/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/plc4j/protocols/test/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver
similarity index 100%
rename from plc4j/protocols/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
rename to plc4j/protocols/test/src/main/resources/META-INF/services/org.apache.plc4x.java.spi.PlcDriver


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services