You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/10/17 16:48:18 UTC

[incubator-plc4x] 11/19: adjusted connectors and examples to the new api

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit eb079f92a09404df93545c436cc2ff5d71353918
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Thu Oct 4 15:54:51 2018 +0200

    adjusted connectors and examples to the new api
---
 .../azure/iothub/S7PlcToAzureIoTHubSample.java     |  5 +---
 .../google/iotcore/S7PlcToGoogleIoTCoreSample.java |  5 +---
 .../plc4x/java/examples/helloplc4x/HelloPlc4x.java | 22 +++++++-------
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java | 17 +++--------
 .../apache/plc4x/camel/Plc4XPollingConsumer.java   | 14 +++++----
 .../java/org/apache/plc4x/camel/Plc4XProducer.java |  4 +--
 .../java/org/apache/plc4x/camel/MockDriver.java    | 11 ++-----
 .../org/apache/plc4x/camel/Plc4XProducerTest.java  |  3 --
 .../apache/plc4x/edgent/PlcConnectionAdapter.java  | 14 ++-------
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java |  9 +++---
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    | 23 +++++++--------
 .../org/apache/plc4x/nifi/Plc4xSinkProcessor.java  |  8 ++---
 .../apache/plc4x/nifi/Plc4xSourceProcessor.java    | 24 +++++++--------
 .../plc4x/java/api/connection/PlcConnection.java   |  8 -----
 .../plc4x/java/api/connection/PlcReader.java       |  2 --
 .../plc4x/java/api/connection/PlcSubscriber.java   |  4 ---
 .../plc4x/java/api/connection/PlcWriter.java       |  2 --
 .../apache/plc4x/java/ads/ManualPlc4XAdsTest.java  | 13 ++++-----
 .../base/connection/AbstractPlcConnection.java     | 28 ------------------
 .../java/ethernetip/ManualPlc4XEtherNetIpTest.java |  8 ++---
 .../plc4x/java/modbus/ManualPlc4XModbusTest.java   | 34 +++++++++-------------
 .../org/apache/plc4x/java/test/TestConnection.java | 16 ----------
 22 files changed, 83 insertions(+), 191 deletions(-)

diff --git a/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java b/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
index a4f80cc..45ac6c6 100644
--- a/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
+++ b/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
@@ -23,7 +23,6 @@ import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
 import com.microsoft.azure.sdk.iot.device.Message;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.slf4j.Logger;
@@ -62,15 +61,13 @@ public class S7PlcToAzureIoTHubSample {
             DeviceClient client = new DeviceClient(iotConnectionString, IotHubClientProtocol.MQTT);
             client.open();
 
-            // Get a reader instance.
-            PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalStateException::new);
 
             // Prepare a read request.
             PlcReadRequest request = plcConnection.readRequestBuilder().get().addItem(FIELD_NAME, addressString).build();
 
             while (!Thread.currentThread().isInterrupted()) {
                 // Simulate telemetry.
-                PlcReadResponse response = plcReader.read(request).get();
+                PlcReadResponse response = request.execute().get();
                 response.getAllLongs(FIELD_NAME)
                     .forEach(longValue -> {
                             String result = Long.toBinaryString(longValue);
diff --git a/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java b/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java
index fba969c..c6aac9b 100644
--- a/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java
+++ b/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java
@@ -23,7 +23,6 @@ import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.SignatureAlgorithm;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.eclipse.paho.client.mqttv3.*;
@@ -233,13 +232,11 @@ public class S7PlcToGoogleIoTCoreSample {
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection("s7://10.10.64.20/1/1")) {
             logger.info("Connected");
 
-            PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalAccessError::new);
-
             PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("outputs", "OUTPUTS/0").build();
 
             while (!Thread.currentThread().isInterrupted()) {
 
-                PlcReadResponse plcReadResponse = plcReader.read(readRequest).get();
+                PlcReadResponse plcReadResponse = readRequest.execute().get();
 
                 // Refresh the connection credentials before the JWT expires.
                 // [START iot_mqtt_jwt_refresh]
diff --git a/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java b/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
index 402c856..05190e9 100644
--- a/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
+++ b/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
@@ -20,14 +20,12 @@ package org.apache.plc4x.java.examples.helloplc4x;
 
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class HelloPlc4x {
@@ -49,34 +47,36 @@ public class HelloPlc4x {
         // Establish a connection to the plc using the url provided as first argument
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection(args[0])) {
 
-            Optional<PlcReader> reader = plcConnection.getReader();
-
             // Check if this connection support reading of data.
-            if (reader.isPresent()) {
-                PlcReader plcReader = reader.get();
+            if (plcConnection.readRequestBuilder().isPresent()) {
 
                 // Create a new read request:
                 // - Give the single item requested the alias name "value"
-                PlcReadRequest.Builder builder = plcConnection.readRequestBuilder().get();
+                PlcReadRequest.Builder syncBuilder = plcConnection.readRequestBuilder().get();
                 for (int i = 1; i < args.length; i++) {
-                    builder.addItem("value-" + i, args[i]);
+                    syncBuilder.addItem("value-" + i, args[i]);
                 }
-                PlcReadRequest plcReadRequest = builder.build();
+                PlcReadRequest syncPlcReadRequest = syncBuilder.build();
 
                 //////////////////////////////////////////////////////////
                 // Read synchronously ...
                 // NOTICE: the ".get()" immediately lets this thread pause till
                 // the response is processed and available.
                 System.out.println("\nSynchronous request ...");
-                PlcReadResponse syncResponse = plcReader.read(plcReadRequest).get();
+                PlcReadResponse syncResponse = syncPlcReadRequest.execute().get();
                 // Simply iterating over the field names returned in the response.
                 printResponse(syncResponse);
 
                 //////////////////////////////////////////////////////////
                 // Read asynchronously ...
                 // Register a callback executed as soon as a response arives.
+                PlcReadRequest.Builder asyncBuilder = plcConnection.readRequestBuilder().get();
+                for (int i = 1; i < args.length; i++) {
+                    asyncBuilder.addItem("value-" + i, args[i]);
+                }
+                PlcReadRequest asyncPlcReadRequest = asyncBuilder.build();
                 System.out.println("\n\nAsynchronous request ...");
-                CompletableFuture<PlcReadResponse> asyncResponse = plcReader.read(plcReadRequest);
+                CompletableFuture<? extends PlcReadResponse> asyncResponse = asyncPlcReadRequest.execute();
                 asyncResponse.whenComplete((readResponse, throwable) -> {
                     if (readResponse != null) {
                         printResponse(syncResponse);
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 9ec281c..e2f983c 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -24,7 +24,6 @@ import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.*;
 import org.slf4j.Logger;
@@ -49,7 +48,6 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
     private Class<?> dataType;
     private PlcSubscriptionResponse subscriptionResponse;
 
-
     public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
         this.endpoint = endpoint;
         this.dataType = endpoint.getDataType();
@@ -80,22 +78,19 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
 
     @Override
     protected void doStart() throws InterruptedException, ExecutionException, PlcException {
-        PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(
-            () -> new PlcException("Connection doesn't support subscriptions."));
         // TODO: Is it correct to only support one field?
         PlcSubscriptionRequest request = plcConnection.subscriptionRequestBuilder().get()
             .addCyclicField("default", fieldQuery, Duration.of(3, ChronoUnit.SECONDS)).build();
-        PlcSubscriptionResponse plcSubscriptionResponse = plcSubscriber.subscribe(request).get();
+        subscriptionResponse = request.execute().get();
         // TODO: we need to return the plcSubscriptionResponse here too as we need this to unsubscribe...
-        plcSubscriber.register(this, plcSubscriptionResponse.getSubscriptionHandles());
+        // TODO: figure out what to do with this
+        // plcSubscriber.register(this, plcSubscriptionResponse.getSubscriptionHandles());
     }
 
     @Override
     protected void doStop() throws InterruptedException, ExecutionException, TimeoutException, PlcException {
-        PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(
-            () -> new PlcException("Connection doesn't support subscriptions."));
         PlcUnsubscriptionRequest request = plcConnection.unsubscriptionRequestBuilder().get().addHandles(subscriptionResponse.getSubscriptionHandles()).build();
-        CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture = plcSubscriber.unsubscribe(request);
+        CompletableFuture<? extends PlcUnsubscriptionResponse> unsubscriptionFuture = request.execute();
         PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
         // TODO: Handle the response ...
         try {
@@ -105,10 +100,6 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
         }
     }
 
-    private PlcSubscriber getSubscriber() {
-        return plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("No subscriber available"));
-    }
-
     @Override
     public void accept(PlcSubscriptionEvent plcSubscriptionEvent) {
         LOGGER.debug("Received {}", plcSubscriptionEvent);
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
index 4aee7ad..3e90b44 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
@@ -44,8 +44,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
     private Plc4XEndpoint endpoint;
     private ExceptionHandler exceptionHandler;
     private PlcConnection plcConnection;
-    private PlcReader plcReader;
-    private PlcReadRequest readRequest;
+    private PlcReadRequest.Builder requestBuilder;
     private Class dataType;
 
     public Plc4XPollingConsumer(Plc4XEndpoint endpoint) throws PlcException {
@@ -54,8 +53,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
         this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
         String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
         this.plcConnection = endpoint.getPlcDriverManager().getConnection(plc4xURI);
-        this.plcReader = plcConnection.getReader().orElseThrow(() -> new PlcException("This connection doesn't support reading."));
-        readRequest = plcConnection.readRequestBuilder().get().addItem("default", endpoint.getAddress()).build();
+        this.requestBuilder = plcConnection.readRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support reading."));
     }
 
     @Override
@@ -79,7 +77,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
     @Override
     public Exchange receive() {
         Exchange exchange = endpoint.createExchange();
-        CompletableFuture<? extends PlcReadResponse> read = plcReader.read(readRequest);
+        CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
         try {
             PlcReadResponse plcReadResponse = read.get();
             exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default")));
@@ -97,7 +95,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
     @Override
     public Exchange receive(long timeout) {
         Exchange exchange = endpoint.createExchange();
-        CompletableFuture<? extends PlcReadResponse> read = plcReader.read(readRequest);
+        CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
         try {
             PlcReadResponse plcReadResponse = read.get(timeout, TimeUnit.MILLISECONDS);
             exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default")));
@@ -121,6 +119,10 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
         }
     }
 
+    private PlcReadRequest createReadRequest() {
+        return requestBuilder.addItem("default", endpoint.getAddress()).build();
+    }
+
     private Object unwrapIfSingle(Collection collection) {
         if (collection.isEmpty()) {
             return null;
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
index b4adbcc..f43e603 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
@@ -23,7 +23,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
@@ -60,9 +59,8 @@ public class Plc4XProducer extends DefaultAsyncProducer {
             Object value = in.getBody(Object.class);
 //            builder.addItem(fieldName, fieldQuery, value);
         }
-        PlcWriter plcWriter = plcConnection.getWriter().orElseThrow(() -> new IllegalArgumentException("Writer for driver not found"));
         PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder().orElseThrow(() -> new IllegalArgumentException("Writer for driver not found"));
-        CompletableFuture<? extends PlcWriteResponse> completableFuture = plcWriter.write(builder.build());
+        CompletableFuture<? extends PlcWriteResponse> completableFuture = builder.build().execute();
         int currentlyOpenRequests = openRequests.incrementAndGet();
         try {
             log.debug("Currently open requests including {}:{}", exchange, currentlyOpenRequests);
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
index 9a3ea9e..2be45b2 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
@@ -21,13 +21,8 @@ package org.apache.plc4x.camel;
 import org.apache.plc4x.java.api.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.connection.PlcWriter;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.base.messages.DefaultPlcSubscriptionResponse;
 import org.apache.plc4x.java.base.messages.InternalPlcSubscriptionRequest;
 import org.slf4j.Logger;
@@ -61,11 +56,10 @@ public class MockDriver implements PlcDriver {
     public PlcConnection connect(String url) {
         // Mock a connection.
         PlcConnection plcConnectionMock = mock(PlcConnection.class, RETURNS_DEEP_STUBS);
-        when(plcConnectionMock.getWriter()).thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS)));
-        when(plcConnectionMock.getReader()).thenReturn(Optional.of(mock(PlcReader.class, RETURNS_DEEP_STUBS)));
         when(plcConnectionMock.readRequestBuilder()).thenReturn(Optional.of(mock(PlcReadRequest.Builder.class, RETURNS_DEEP_STUBS)));
         when(plcConnectionMock.writeRequestBuilder()).thenReturn(Optional.of(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS)));
         when(plcConnectionMock.subscriptionRequestBuilder()).thenReturn(Optional.of(mock(PlcSubscriptionRequest.Builder.class, RETURNS_DEEP_STUBS)));
+        when(plcConnectionMock.unsubscriptionRequestBuilder()).thenReturn(Optional.of(mock(PlcUnsubscriptionRequest.Builder.class, RETURNS_DEEP_STUBS)));
 
         // Mock a typical subscriber.
         PlcSubscriber plcSubscriber = mock(PlcSubscriber.class, RETURNS_DEEP_STUBS);
@@ -97,7 +91,6 @@ public class MockDriver implements PlcDriver {
             responseFuture.complete(response);
             return responseFuture;
         });
-        when(plcConnectionMock.getSubscriber()).thenReturn(Optional.of(plcSubscriber));
         return plcConnectionMock;
     }
 
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
index 60706d6..0068a11 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
@@ -48,9 +48,6 @@ public class Plc4XProducerTest {
         when(endpointMock.getEndpointUri()).thenReturn("plc4x:mock:10.10.10.1/1/1");
         PlcDriverManager plcDriverManagerMock = mock(PlcDriverManager.class, RETURNS_DEEP_STUBS);
 
-        when(plcDriverManagerMock.getConnection(anyString()).getWriter())
-            .thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS)));
-
         when(plcDriverManagerMock.getConnection(anyString()).writeRequestBuilder())
             .thenReturn(Optional.of(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS)));
 
diff --git a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
index 4e1fbcd..e3e23cc 100644
--- a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
+++ b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
@@ -24,8 +24,6 @@ import org.apache.edgent.function.Function;
 import org.apache.edgent.function.Supplier;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -117,9 +115,7 @@ public class PlcConnectionAdapter implements AutoCloseable {
                 PlcConnection connection = null;
                 try {
                     connection = getConnection();
-                    PlcReader reader = connection.getReader()
-                        .orElseThrow(() -> new PlcException("This connection doesn't support reading"));
-                    return reader.read(readRequest).get();
+                    return readRequest.execute().get();
                 } catch (Exception e) {
                     logger.error("reading from plc device {} {} failed", connection, readRequest, e);
                     return null;
@@ -154,10 +150,8 @@ public class PlcConnectionAdapter implements AutoCloseable {
             PlcField field = null;
             try {
                 connection = getConnection();
-                PlcReader reader = connection.getReader()
-                    .orElseThrow(() -> new PlcException("This connection doesn't support reading"));
                 PlcReadRequest readRequest = connection.readRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support reading")).addItem(FIELD_NAME, fieldQuery).build();
-                PlcReadResponse readResponse = reader.read(readRequest).get();
+                PlcReadResponse readResponse = readRequest.execute().get();
                 Object value = null;
                 switch (clientDatatype) {
                     case BYTE:
@@ -219,12 +213,10 @@ public class PlcConnectionAdapter implements AutoCloseable {
             PlcConnection connection = null;
             try {
                 connection = getConnection();
-                PlcWriter writer = connection.getWriter()
-                    .orElseThrow(() -> new PlcException("This connection doesn't support writing"));
                 PlcWriteRequest.Builder builder = connection.writeRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support writing"));
                 PlcWriteRequest writeRequest = builder.build();
                 addItem(builder, clientDatatype, fieldQuery, fieldValue);
-                writer.write(writeRequest).get();
+                writeRequest.execute().get();
             } catch (Exception e) {
                 logger.error("writing to plc device {} {} failed", connection, fieldQuery, e);
             }
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index 6829294..7459ac5 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -24,7 +24,6 @@ import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.kafka.util.VersionUtil;
@@ -37,7 +36,6 @@ public class Plc4xSinkTask extends SinkTask {
     private String url;
 
     private PlcConnection plcConnection;
-    private PlcWriter plcWriter;
 
     @Override
     public String version() {
@@ -51,8 +49,9 @@ public class Plc4xSinkTask extends SinkTask {
 
         openConnection();
 
-        plcWriter = plcConnection.getWriter()
-            .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
+        if (!plcConnection.writeRequestBuilder().isPresent()) {
+            throw new ConnectException("Writing not supported on this connection");
+        }
     }
 
     @Override
@@ -107,7 +106,7 @@ public class Plc4xSinkTask extends SinkTask {
 
     private void doWrite(PlcWriteRequest request) {
         try {
-            plcWriter.write(request).get();
+            request.execute().get();
         } catch (ExecutionException | InterruptedException e) {
             throw new ConnectException("Caught exception during write", e);
         }
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 08b3ec1..7a2b21b 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -61,8 +60,6 @@ public class Plc4xSourceTask extends SourceTask {
     private List<String> queries;
 
     private PlcConnection plcConnection;
-    private PlcReader plcReader;
-    private PlcReadRequest plcRequest;
 
     // TODO: should we use shared (static) thread pool for this?
     private ScheduledExecutorService scheduler;
@@ -82,15 +79,9 @@ public class Plc4xSourceTask extends SourceTask {
 
         openConnection();
 
-        plcReader = plcConnection.getReader()
-            .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
-
-
-        PlcReadRequest.Builder builder = plcConnection.readRequestBuilder().get();
-        for (String query : queries) {
-            builder.addItem(query, query);
+        if (!plcConnection.readRequestBuilder().isPresent()) {
+            throw new ConnectException("Reading not supported on this connection");
         }
-        plcRequest = builder.build();
 
         int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
         scheduler = Executors.newScheduledThreadPool(1);
@@ -154,7 +145,7 @@ public class Plc4xSourceTask extends SourceTask {
     }
 
     private List<SourceRecord> doFetch() throws InterruptedException {
-        final CompletableFuture<PlcReadResponse> response = plcReader.read(plcRequest);
+        final CompletableFuture<? extends PlcReadResponse> response = createReadRequest().execute();
         try {
             final PlcReadResponse received = response.get(TIMEOUT_LIMIT_MILLIS, TimeUnit.MILLISECONDS);
             return extractValues(received);
@@ -165,6 +156,14 @@ public class Plc4xSourceTask extends SourceTask {
         }
     }
 
+    private PlcReadRequest createReadRequest() {
+        PlcReadRequest.Builder builder = plcConnection.readRequestBuilder().get();
+        for (String query : queries) {
+            builder.addItem(query, query);
+        }
+        return builder.build();
+    }
+
     private List<SourceRecord> extractValues(PlcReadResponse response) {
         final List<SourceRecord> result = new LinkedList<>();
         for (String query : queries) {
diff --git a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
index 973855b..a72ea23 100644
--- a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
+++ b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
@@ -29,7 +29,6 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 
@@ -53,8 +52,9 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
 
         // Get an instance of a component able to write to a PLC.
         PlcConnection connection = getConnection();
-        PlcWriter writer = connection.getWriter().orElseThrow(
-            () -> new ProcessException("Writing not supported by connection"));
+        if (!connection.writeRequestBuilder().isPresent()) {
+            throw new ProcessException("Writing not supported by connection");
+        }
 
         // Prepare the request.
         PlcWriteRequest.Builder builder = connection.writeRequestBuilder().get();
@@ -67,7 +67,7 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
         PlcWriteRequest writeRequest = builder.build();
 
         // Send the request to the PLC.
-        CompletableFuture<PlcWriteResponse> future = writer.write(writeRequest);
+        CompletableFuture<? extends PlcWriteResponse> future = writeRequest.execute();
         future.whenComplete((response, throwable) -> {
             if (throwable != null) {
                 session.transfer(session.create(), FAILURE);
diff --git a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index 190a00d..7254e90 100644
--- a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++ b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -28,7 +28,6 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.json.simple.JSONObject;
@@ -47,23 +46,24 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor {
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         // Get an instance of a component able to read from a PLC.
         PlcConnection connection = getConnection();
-        PlcReader reader = connection.getReader().orElseThrow(
-            () -> new ProcessException("Writing not supported by connection"));
 
         // Prepare the request.
-        PlcReadRequest.Builder builder = connection.readRequestBuilder().get();
-        getFields().forEach(field -> {
-            String address = getAddress(field);
-            if(address != null) {
-                builder.addItem(field, address);
-            }
-        });
-        PlcReadRequest readRequest = builder.build();
+        if (!connection.readRequestBuilder().isPresent()) {
+            throw new ProcessException("Writing not supported by connection");
+        }
 
         FlowFile flowFile = session.create();
         session.append(flowFile, out -> {
             try {
-                PlcReadResponse response = reader.read(readRequest).get();
+                PlcReadRequest.Builder builder = connection.readRequestBuilder().get();
+                getFields().forEach(field -> {
+                    String address = getAddress(field);
+                    if(address != null) {
+                        builder.addItem(field, address);
+                    }
+                });
+                PlcReadRequest readRequest = builder.build();
+                PlcReadResponse response = readRequest.execute().get();
                 JSONObject obj = new JSONObject();
                 for (String fieldName : response.getFieldNames()) {
                     for(int i = 0; i < response.getNumberOfValues(fieldName); i++) {
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
index 66a3778..a6da03d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
@@ -65,12 +65,4 @@ public interface PlcConnection extends AutoCloseable {
 
     Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder();
 
-    // the following methods should be moved to the SPI
-
-    Optional<PlcReader> getReader();
-
-    Optional<PlcWriter> getWriter();
-
-    Optional<PlcSubscriber> getSubscriber();
-
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java
index 6d50da0..133868d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java
@@ -37,6 +37,4 @@ public interface PlcReader {
      */
     CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest);
 
-    //PlcReadRequest.Builder readRequestBuilder();
-
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
index d1c97ba..4a3f6cc 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
@@ -62,8 +62,4 @@ public interface PlcSubscriber {
      */
     void unregister(PlcConsumerRegistration registration);
 
-    //PlcSubscriptionRequest.Builder subscriptionRequestBuilder();
-
-    //PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder();
-
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
index a4e44bf..db52de6 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
@@ -37,6 +37,4 @@ public interface PlcWriter {
      */
     CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest);
 
-    //PlcWriteRequest.Builder writeRequestBuilder();
-
 }
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index 6e99861..283663a 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -43,22 +43,19 @@ public class ManualPlc4XAdsTest {
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
             System.out.println("PlcConnection " + plcConnection);
 
-            PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
             PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("station", "Allgemein_S2.Station:BYTE").build();
-            CompletableFuture<PlcReadResponse> response = reader.read(readRequest);
+            CompletableFuture<? extends PlcReadResponse> response = readRequest.execute();
             PlcReadResponse readResponse = response.get();
             System.out.println("Response " + readResponse);
             Collection<Integer> stations = readResponse.getAllIntegers("station");
             stations.forEach(System.out::println);
 
-            PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe not available"));
-
             PlcSubscriptionRequest subscriptionRequest = plcConnection.subscriptionRequestBuilder().get().addChangeOfStateField("stationChange", "Allgemein_S2.Station:BYTE").build();
-            CompletableFuture<PlcSubscriptionResponse> subscribeResponse = plcSubscriber.subscribe(subscriptionRequest);
+            CompletableFuture<? extends PlcSubscriptionResponse> subscribeResponse = subscriptionRequest.execute();
             PlcSubscriptionResponse plcSubscriptionResponse = subscribeResponse.get();
 
-            PlcConsumerRegistration plcConsumerRegistration = plcSubscriber.register(System.out::println, plcSubscriptionResponse.getSubscriptionHandles());
+            // TODO: figure out what to do with this
+            /*PlcConsumerRegistration plcConsumerRegistration = plcSubscriber.register(System.out::println, plcSubscriptionResponse.getSubscriptionHandles());
 
             TimeUnit.SECONDS.sleep(5);
 
@@ -68,7 +65,7 @@ public class ManualPlc4XAdsTest {
 
             unsubscriptionResponse
                 .get(5, TimeUnit.SECONDS);
-            System.out.println(unsubscriptionResponse);
+            System.out.println(unsubscriptionResponse);*/
         }
         System.exit(0);
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
index 8dcedd8..28be93f 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
@@ -23,14 +23,10 @@ import io.netty.channel.ChannelHandler;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcIoException;
 
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
@@ -115,30 +111,6 @@ public abstract class AbstractPlcConnection implements PlcConnection {
         // Implemented in sub-classes, if needed.
     }
 
-    @Override
-    public Optional<PlcReader> getReader() {
-        if (this instanceof PlcReader) {
-            return Optional.of((PlcReader) this);
-        }
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<PlcWriter> getWriter() {
-        if (this instanceof PlcWriter) {
-            return Optional.of((PlcWriter) this);
-        }
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<PlcSubscriber> getSubscriber() {
-        if (this instanceof PlcSubscriber) {
-            return Optional.of((PlcSubscriber) this);
-        }
-        return Optional.empty();
-    }
-
     /**
      * Can be used to check and cast a parameter to its required internal type (can be used for general type checking too).
      *
diff --git a/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java b/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java
index 949eaeb..95cc511 100644
--- a/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java
+++ b/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java
@@ -20,7 +20,6 @@ package org.apache.plc4x.java.ethernetip;
 
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 
@@ -34,14 +33,11 @@ public class ManualPlc4XEtherNetIpTest {
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
             System.out.println("PlcConnection " + plcConnection);
 
-            // Get a reader instance.
-            PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
-            PlcReadRequest readRequest = plcConnection.readRequestBuilder().get()
+            PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("Reading not supported"))
                 .addItem("field", "#4#105#3").build();
 
             // Execute the read operation.
-            CompletableFuture<PlcReadResponse> response = reader.read(readRequest);
+            CompletableFuture<? extends PlcReadResponse> response = readRequest.execute();
             PlcReadResponse readResponse = response.get();
 
             // Output the response.
diff --git a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java
index 3f6c786..910942a 100644
--- a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java
+++ b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java
@@ -21,8 +21,6 @@ package org.apache.plc4x.java.modbus;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
@@ -50,10 +48,9 @@ public class ManualPlc4XModbusTest {
             System.out.println("PlcConnection " + plcConnection);
 
             {
-                PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
-                PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("randomRegister", "register:7[3]").build();
-                PlcReadResponse readResponse = reader.read(readRequest).get();
+                PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
+                .addItem("randomRegister", "register:7[3]").build();
+                PlcReadResponse readResponse = readRequest.execute().get();
                 System.out.println("Response " + readResponse);
                 readResponse.getAllByteArrays("randomRegister").stream()
                     .map(HexUtil::toHex)
@@ -63,11 +60,11 @@ public class ManualPlc4XModbusTest {
 
             {
                 // Read an int from 2 registers
-                PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
 
                 // Just dump the actual values
-                PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("randomRegister", "register:3[2]").build();
-                PlcReadResponse readResponse = reader.read(readRequest).get();
+                PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
+                    .addItem("randomRegister", "register:3[2]").build();
+                PlcReadResponse readResponse = readRequest.execute().get();
                 System.out.println("Response " + readResponse);
                 Collection<Byte[]> randomRegisters = readResponse.getAllByteArrays("randomRegister");
                 randomRegisters.stream()
@@ -87,17 +84,16 @@ public class ManualPlc4XModbusTest {
 
             {
                 // Read an int from 2 registers and multiple requests
-                PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
 
                 // Just dump the actual values
-                PlcReadRequest readRequest = plcConnection.readRequestBuilder().get()
+                PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
                     .addItem("randomRegister1", "register:1[2]")
                     .addItem("randomRegister2", "register:10[3]")
                     .addItem("randomRegister3", "register:20[4]")
                     .addItem("randomRegister4", "register:30[5]")
                     .addItem("randomRegister5", "register:40[6]")
                     .build();
-                PlcReadResponse readResponse = reader.read(readRequest).get();
+                PlcReadResponse readResponse = readRequest.execute().get();
                 System.out.println("Response " + readResponse);
                 IntStream.range(1, 6).forEach(i -> {
                     Collection<Byte[]> randomRegisters = readResponse.getAllByteArrays("randomRegister" + i);
@@ -118,10 +114,9 @@ public class ManualPlc4XModbusTest {
             }
 
             {
-                PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found"));
-
-                PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("randomCoil", "coil:1[9]").build();
-                PlcReadResponse readResponse = reader.read(readRequest).get();
+                PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found"))
+                    .addItem("randomCoil", "coil:1[9]").build();
+                PlcReadResponse readResponse = readRequest.execute().get();
                 System.out.println("Response " + readResponse);
                 readResponse.getAllBooleans("randomCoil").stream()
                     .map(hex -> "Coil Value: " + hex)
@@ -129,10 +124,9 @@ public class ManualPlc4XModbusTest {
             }
 
             {
-                PlcWriter writer = plcConnection.getWriter().orElseThrow(() -> new RuntimeException("No Writer found"));
-
-                PlcWriteRequest writeRequest = plcConnection.writeRequestBuilder().get().addItem("randomCoilField", "coil:1", true).build();
-                PlcWriteResponse writeResponse = writer.write(writeRequest).get();
+                PlcWriteRequest writeRequest = plcConnection.writeRequestBuilder().orElseThrow(() -> new RuntimeException("No Writer found"))
+                    .addItem("randomCoilField", "coil:1", true).build();
+                PlcWriteResponse writeResponse = writeRequest.execute().get();
                 System.out.println("Response " + writeResponse);
             }
         } catch (Exception e) {
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
index 68b1ba3..4c5764b 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
@@ -22,7 +22,6 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
@@ -62,21 +61,6 @@ class TestConnection implements PlcConnection, PlcReader, PlcWriter {
     }
 
     @Override
-    public Optional<PlcReader> getReader() {
-        return Optional.of(this);
-    }
-
-    @Override
-    public Optional<PlcWriter> getWriter() {
-        return Optional.of(this);
-    }
-
-    @Override
-    public Optional<PlcSubscriber> getSubscriber() {
-        return Optional.empty(); // TODO: implement this
-    }
-
-    @Override
     public Optional<PlcReadRequest.Builder> readRequestBuilder() {
         return Optional.of(new DefaultPlcReadRequest.Builder(this, new TestFieldHandler()));
     }