You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/10/17 16:48:09 UTC

[incubator-plc4x] 02/19: implemented new api in driver base

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit edc56be4efffde229e524be60d548505e48264bf
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Thu Oct 4 11:09:02 2018 +0200

    implemented new api in driver base
---
 .../messages/DefaultPlcProprietaryRequest.java     |  9 +++
 .../messages/DefaultPlcProprietaryResponse.java    |  6 +-
 .../java/base/messages/DefaultPlcReadRequest.java  | 22 +++++++-
 .../messages/DefaultPlcSubscriptionRequest.java    | 20 ++++++-
 .../messages/DefaultPlcUnsubscriptionRequest.java  | 19 ++++++-
 .../java/base/messages/DefaultPlcWriteRequest.java | 18 +++++-
 .../base/messages/InternalPlcFieldResponse.java    |  2 +-
 .../messages/InternalPlcProprietaryResponse.java   |  2 +-
 .../base/messages/InternalPlcReadResponse.java     |  2 +-
 .../java/base/messages/InternalPlcResponse.java    |  3 +-
 .../base/messages/InternalPlcWriteResponse.java    |  2 +-
 .../SingleItemToSingleRequestProtocol.java         | 65 ++++++++++++----------
 .../SingleItemToSingleRequestProtocolTest.java     | 60 ++++++++++++--------
 13 files changed, 160 insertions(+), 70 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java
index fa716f0..4e73899 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java
@@ -18,8 +18,17 @@
  */
 package org.apache.plc4x.java.base.messages;
 
+import org.apache.plc4x.java.api.messages.PlcResponse;
+
+import java.util.concurrent.CompletableFuture;
+
 public class DefaultPlcProprietaryRequest<REQUEST> implements InternalPlcProprietaryRequest<REQUEST> {
 
+    @Override
+    public CompletableFuture<PlcResponse> execute() {
+        throw new RuntimeException("not supported"); // TODO: figure out what to do with this
+    }
+
     private REQUEST proprietaryRequest;
 
     public DefaultPlcProprietaryRequest(REQUEST proprietaryRequest) {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java
index 679fde1..69ee2d0 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java
@@ -18,13 +18,13 @@
  */
 package org.apache.plc4x.java.base.messages;
 
-public class DefaultPlcProprietaryResponse<REQUEST, RESPONSE> implements InternalPlcProprietaryResponse<REQUEST, RESPONSE> {
+public class DefaultPlcProprietaryResponse<RESPONSE> implements InternalPlcProprietaryResponse<RESPONSE> {
 
-    private final InternalPlcProprietaryRequest<REQUEST> plcProprietaryRequest;
+    private final InternalPlcProprietaryRequest plcProprietaryRequest;
 
     private final RESPONSE proprietaryResponse;
 
-    public DefaultPlcProprietaryResponse(InternalPlcProprietaryRequest<REQUEST> plcProprietaryRequest, RESPONSE proprietaryResponse) {
+    public DefaultPlcProprietaryResponse(InternalPlcProprietaryRequest plcProprietaryRequest, RESPONSE proprietaryResponse) {
         this.plcProprietaryRequest = plcProprietaryRequest;
         this.proprietaryResponse = proprietaryResponse;
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
index 9b60fe4..d835bc0 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
@@ -19,23 +19,33 @@ under the License.
 package org.apache.plc4x.java.base.messages;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.base.connection.PlcFieldHandler;
 
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPlcFieldRequest {
 
+    private final PlcReader reader;
     private LinkedHashMap<String, PlcField> fields;
 
-    protected DefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
+    protected DefaultPlcReadRequest(PlcReader reader, LinkedHashMap<String, PlcField> fields) {
+        this.reader = reader;
         this.fields = fields;
     }
 
     @Override
+    public CompletableFuture<PlcReadResponse> execute() {
+        return reader.read(this);
+    }
+
+    @Override
     public int getNumberOfFields() {
         return fields.size();
     }
@@ -64,12 +74,18 @@ public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPl
             .collect(Collectors.toCollection(LinkedList::new));
     }
 
+    protected PlcReader getReader() {
+        return reader;
+    }
+
     public static class Builder implements PlcReadRequest.Builder {
 
+        private final PlcReader reader;
         private final PlcFieldHandler fieldHandler;
         private final Map<String, String> fields;
 
-        public Builder(PlcFieldHandler fieldHandler) {
+        public Builder(PlcReader reader, PlcFieldHandler fieldHandler) {
+            this.reader = reader;
             this.fieldHandler = fieldHandler;
             fields = new TreeMap<>();
         }
@@ -90,7 +106,7 @@ public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPl
                 PlcField parsedField = fieldHandler.createField(fieldQuery);
                 parsedFields.put(name, parsedField);
             });
-            return new DefaultPlcReadRequest(parsedFields);
+            return new DefaultPlcReadRequest(reader, parsedFields);
         }
 
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index 3194cf5..35bf369 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -20,7 +20,9 @@ package org.apache.plc4x.java.base.messages;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcSubscriptionType;
 import org.apache.plc4x.java.base.connection.PlcFieldHandler;
@@ -28,11 +30,23 @@ import org.apache.plc4x.java.base.messages.items.FieldItem;
 
 import java.time.Duration;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 
 // TODO: request broken needs finishing.
 public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionRequest, InternalPlcFieldRequest {
 
+    private final PlcSubscriber subscriber;
+
+    public DefaultPlcSubscriptionRequest(PlcSubscriber subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    @Override
+    public CompletableFuture<PlcSubscriptionResponse> execute() {
+        return subscriber.subscribe(this);
+    }
+
     @Override
     public int getNumberOfFields() {
         throw new IllegalStateException("not available");
@@ -65,10 +79,12 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
 
     public static class Builder implements PlcSubscriptionRequest.Builder {
 
+        private final PlcSubscriber subscriber;
         private final PlcFieldHandler fieldHandler;
         private final Map<String, BuilderItem<Object>> fields;
 
-        public Builder(PlcFieldHandler fieldHandler) {
+        public Builder(PlcSubscriber subscriber, PlcFieldHandler fieldHandler) {
+            this.subscriber = subscriber;
             this.fieldHandler = fieldHandler;
             fields = new TreeMap<>();
         }
@@ -99,7 +115,7 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
                 FieldItem fieldItem = builderItem.encoder.apply(parsedField, null);
                 parsedFields.put(name, new ImmutablePair<>(parsedField, fieldItem));
             });
-            return new DefaultPlcSubscriptionRequest();
+            return new DefaultPlcSubscriptionRequest(subscriber);
         }
 
         private static class BuilderItem<T> {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
index 03e1f3e..45300a5 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
@@ -19,24 +19,35 @@
 package org.apache.plc4x.java.base.messages;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
 
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 // TODO: request broken needs finishing.
 public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptionRequest, InternalPlcFieldRequest {
 
+    private final PlcSubscriber subscriber;
+
     private final Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles;
 
-    public DefaultPlcUnsubscriptionRequest(Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+    public DefaultPlcUnsubscriptionRequest(PlcSubscriber subscriber, Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+        this.subscriber = subscriber;
         this.internalPlcSubscriptionHandles = internalPlcSubscriptionHandles;
     }
 
     @Override
+    public CompletableFuture<PlcUnsubscriptionResponse> execute() {
+        return subscriber.unsubscribe(this);
+    }
+
+    @Override
     public int getNumberOfFields() {
         throw new IllegalStateException("not available");
     }
@@ -68,9 +79,11 @@ public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptio
 
     public static class Builder implements PlcUnsubscriptionRequest.Builder {
 
+        private final PlcSubscriber subscriber;
         private List<InternalPlcSubscriptionHandle> plcSubscriptionHandles;
 
-        public Builder() {
+        public Builder(PlcSubscriber subscriber) {
+            this.subscriber = subscriber;
             plcSubscriptionHandles = new ArrayList<>();
         }
 
@@ -94,7 +107,7 @@ public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptio
 
         @Override
         public PlcUnsubscriptionRequest build() {
-            return new DefaultPlcUnsubscriptionRequest(plcSubscriptionHandles);
+            return new DefaultPlcUnsubscriptionRequest(subscriber, plcSubscriptionHandles);
         }
 
 
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
index c78ff08..d475b83 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
@@ -21,8 +21,10 @@ package org.apache.plc4x.java.base.messages;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
+import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.base.connection.PlcFieldHandler;
 import org.apache.plc4x.java.base.messages.items.FieldItem;
@@ -33,18 +35,26 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, InternalPlcFieldRequest {
 
+    private final PlcWriter writer;
     private final LinkedHashMap<String, Pair<PlcField, FieldItem>> fields;
 
-    protected DefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+    protected DefaultPlcWriteRequest(PlcWriter writer, LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+        this.writer = writer;
         this.fields = fields;
     }
 
     @Override
+    public CompletableFuture<PlcWriteResponse> execute() {
+        return writer.write(this);
+    }
+
+    @Override
     public int getNumberOfFields() {
         return fields.size();
     }
@@ -106,11 +116,13 @@ public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, Internal
 
     public static class Builder implements PlcWriteRequest.Builder {
 
+        private final PlcWriter writer;
         private final PlcFieldHandler fieldHandler;
         private final Map<String, BuilderItem<Object>> fields;
         private final Map<Class<?>, BiFunction<PlcField, Object[], FieldItem>> handlerMap;
 
-        public Builder(PlcFieldHandler fieldHandler) {
+        public Builder(PlcWriter writer, PlcFieldHandler fieldHandler) {
+            this.writer = writer;
             this.fieldHandler = fieldHandler;
             fields = new TreeMap<>();
             handlerMap = new HashMap<>();
@@ -236,7 +248,7 @@ public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, Internal
                 FieldItem fieldItem = builderItem.encoder.apply(parsedField, builderItem.values);
                 parsedFields.put(name, new ImmutablePair<>(parsedField, fieldItem));
             });
-            return new DefaultPlcWriteRequest(parsedFields);
+            return new DefaultPlcWriteRequest(writer, parsedFields);
         }
 
         private Builder addItem(String name, String fieldQuery, Object[] values, BiFunction<PlcField, Object[], FieldItem> encoder) {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java
index 8486388..78f6320 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java
@@ -20,7 +20,7 @@ package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcFieldResponse;
 
-public interface InternalPlcFieldResponse<REQUEST_TYPE extends InternalPlcFieldRequest> extends PlcFieldResponse<REQUEST_TYPE> {
+public interface InternalPlcFieldResponse extends PlcFieldResponse {
 
 
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java
index eb5aa3b..1bbea2f 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java
@@ -20,5 +20,5 @@ package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
 
-public interface InternalPlcProprietaryResponse<REQUEST, RESPONSE> extends PlcProprietaryResponse<InternalPlcProprietaryRequest<REQUEST>, RESPONSE>, InternalPlcResponse<InternalPlcProprietaryRequest<REQUEST>> {
+public interface InternalPlcProprietaryResponse<RESPONSE> extends PlcProprietaryResponse<RESPONSE>, InternalPlcResponse {
 }
\ No newline at end of file
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
index 775a4c7..53cf0b5 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
@@ -25,7 +25,7 @@ import org.apache.plc4x.java.base.messages.items.FieldItem;
 
 import java.util.Map;
 
-public interface InternalPlcReadResponse extends PlcReadResponse<InternalPlcReadRequest>, InternalPlcResponse<InternalPlcReadRequest> {
+public interface InternalPlcReadResponse extends PlcReadResponse, InternalPlcResponse {
 
     Map<String, Pair<PlcResponseCode, FieldItem>> getValues();
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java
index 95c9223..dde16f0 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java
@@ -20,5 +20,6 @@ package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcResponse;
 
-public interface InternalPlcResponse<REQUEST_TYPE extends InternalPlcRequest> extends PlcResponse<REQUEST_TYPE> {
+public interface InternalPlcResponse extends PlcResponse {
+
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
index dbcdc5c..06d4ad7 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
@@ -23,6 +23,6 @@ import org.apache.plc4x.java.api.types.PlcResponseCode;
 
 import java.util.Map;
 
-public interface InternalPlcWriteResponse extends PlcWriteResponse<InternalPlcWriteRequest>, InternalPlcResponse<InternalPlcWriteRequest> {
+public interface InternalPlcWriteResponse extends PlcWriteResponse, InternalPlcResponse {
     Map<String, PlcResponseCode> getValues();
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index a9c4a63..3b13a88 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -25,6 +25,8 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.PromiseCombiner;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
+import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcTimeoutException;
 import org.apache.plc4x.java.api.model.PlcField;
@@ -49,26 +51,29 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     private final Timer timer;
 
+    private final PlcReader reader;
+    private final PlcWriter writer;
+
     // TODO: maybe better get from map
     private long defaultReceiveTimeout;
 
     private PendingWriteQueue queue;
 
-    private ConcurrentMap<PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>, Timeout> scheduledTimeouts;
+    private ConcurrentMap<PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>, Timeout> scheduledTimeouts;
 
     // Map to track send subcontainers
-    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedSubContainer;
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>> sentButUnacknowledgedSubContainer;
 
     // Map to map tdpu to original parent container
     // TODO: currently this could be supplied via param, only reason to keep would be for statistics.
-    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> correlationToParentContainer;
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>> correlationToParentContainer;
 
     // Map to track tdpus per container
     // TODO: currently this could be supplied via param, only reason to keep would be for statistics.
     private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
 
     // Map to track a list of responses per parent container
-    private ConcurrentMap<PlcRequestContainer<?, ?>, Queue<InternalPlcResponse<?>>> responsesToBeDelivered;
+    private ConcurrentMap<PlcRequestContainer<?, ?>, Queue<InternalPlcResponse>> responsesToBeDelivered;
 
     private AtomicInteger correlationIdGenerator;
 
@@ -81,15 +86,17 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     private AtomicLong erroredItems;
 
-    public SingleItemToSingleRequestProtocol(Timer timer) {
-        this(timer, true);
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer) {
+        this(reader, writer, timer, true);
     }
 
-    public SingleItemToSingleRequestProtocol(Timer timer, boolean betterImplementationPossible) {
-        this(timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer, boolean betterImplementationPossible) {
+        this(reader, writer, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
     }
 
-    public SingleItemToSingleRequestProtocol(Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+        this.reader = reader;
+        this.writer = writer;
         this.timer = timer;
         this.defaultReceiveTimeout = defaultReceiveTimeout;
         if (betterImplementationPossible) {
@@ -155,16 +162,16 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     // Decoding
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-    protected void tryFinish(Integer currentTdpu, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+    protected void tryFinish(Integer currentTdpu, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse> originalResponseFuture) {
         deliveredItems.incrementAndGet();
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
         LOGGER.info("{} got acknowledged", subPlcRequestContainer);
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
         if (originalPlcRequestContainer == null) {
             LOGGER.warn("Unrelated package received {}", msg);
             return;
         }
-        Queue<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new ConcurrentLinkedQueue<>());
+        Queue<InternalPlcResponse> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new ConcurrentLinkedQueue<>());
         correlatedResponseItems.add(msg);
         Set<Integer> integers = containerCorrelationIdMap.get(originalPlcRequestContainer);
         integers.remove(currentTdpu);
@@ -175,7 +182,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                 timeout.cancel();
             }
 
-            InternalPlcResponse<?> plcResponse;
+            InternalPlcResponse plcResponse;
             if (originalPlcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
                 InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) originalPlcRequestContainer.getRequest();
                 HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>();
@@ -206,13 +213,13 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
         }
     }
 
-    protected void errored(Integer currentTdpu, Throwable throwable, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+    protected void errored(Integer currentTdpu, Throwable throwable, CompletableFuture<InternalPlcResponse> originalResponseFuture) {
         erroredItems.incrementAndGet();
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
         LOGGER.info("{} got errored", subPlcRequestContainer);
 
 
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
         if (originalPlcRequestContainer == null) {
             LOGGER.warn("Unrelated error received tdpu:{}", currentTdpu, throwable);
         } else {
@@ -246,7 +253,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
         if (msg instanceof PlcRequestContainer) {
             @SuppressWarnings("unchecked")
-            PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg;
+            PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>) msg;
             Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> ConcurrentHashMap.newKeySet());
 
             Timeout timeout = timer.newTimeout(timeout_ -> handleTimeout(timeout_, in, tdpus, System.nanoTime()), defaultReceiveTimeout, TimeUnit.MILLISECONDS);
@@ -275,7 +282,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                                     tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
                                 }
                             });
-                        PlcRequestContainer<CorrelatedPlcReadRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture);
+                        PlcRequestContainer<CorrelatedPlcReadRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(reader, field, tdpu), correlatedCompletableFuture);
                         correlationToParentContainer.put(tdpu, in);
                         queue.add(correlatedPlcRequestContainer, subPromise);
                         if (!tdpus.add(tdpu)) {
@@ -298,7 +305,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                                     tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
                                 }
                             });
-                        PlcRequestContainer<CorrelatedPlcWriteRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture);
+                        PlcRequestContainer<CorrelatedPlcWriteRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(writer, fieldItemTriple, tdpu), correlatedCompletableFuture);
                         correlationToParentContainer.put(tdpu, in);
                         queue.add(correlatedPlcRequestContainer, subPromise);
                         if (!tdpus.add(tdpu)) {
@@ -360,7 +367,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
         ctx.flush();
     }
 
-    private void handleTimeout(Timeout timeout, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in, Set<Integer> tdpus, long scheduledAt) {
+    private void handleTimeout(Timeout timeout, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> in, Set<Integer> tdpus, long scheduledAt) {
         if (timeout.isCancelled()) {
             LOGGER.debug("container {} with timeout {} got canceled", in, timeout);
             return;
@@ -386,15 +393,15 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
         protected final int tdpu;
 
-        protected CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
-            super(fields);
+        protected CorrelatedPlcReadRequest(PlcReader reader, LinkedHashMap<String, PlcField> fields, int tdpu) {
+            super(reader, fields);
             this.tdpu = tdpu;
         }
 
-        protected static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
+        protected static CorrelatedPlcReadRequest of(PlcReader reader, Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
             LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
             fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
-            return new CorrelatedPlcReadRequest(fields, tdpu);
+            return new CorrelatedPlcReadRequest(reader, fields, tdpu);
         }
 
         @Override
@@ -407,15 +414,15 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
         private final int tdpu;
 
-        public CorrelatedPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
-            super(fields);
+        public CorrelatedPlcWriteRequest(PlcWriter writer, LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
+            super(writer, fields);
             this.tdpu = tdpu;
         }
 
-        public static CorrelatedPlcWriteRequest of(Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
+        public static CorrelatedPlcWriteRequest of(PlcWriter writer, Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
             LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
             fields.put(fieldItemTriple.getLeft(), Pair.of(fieldItemTriple.getMiddle(), fieldItemTriple.getRight()));
-            return new CorrelatedPlcWriteRequest(fields, tdpu);
+            return new CorrelatedPlcWriteRequest(writer, fields, tdpu);
         }
 
         @Override
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index 47d06c5..c97d2a6 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -25,6 +25,9 @@ import io.netty.channel.PendingWriteQueue;
 import io.netty.util.HashedWheelTimer;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.connection.PlcSubscriber;
+import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.messages.PlcFieldRequest;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
@@ -52,8 +55,18 @@ import static org.mockito.Mockito.*;
 @ExtendWith(MockitoExtension.class)
 class SingleItemToSingleRequestProtocolTest implements WithAssertions {
 
+    PlcReader mockReader = null;
+    PlcWriter mockWriter = null;
+    PlcSubscriber mockSubscriber = null;
+
     @InjectMocks
-    SingleItemToSingleRequestProtocol SUT = new SingleItemToSingleRequestProtocol(new HashedWheelTimer(), TimeUnit.SECONDS.toMillis(1), false);
+    SingleItemToSingleRequestProtocol SUT = new SingleItemToSingleRequestProtocol(
+        mockReader,
+        mockWriter,
+        new HashedWheelTimer(),
+        TimeUnit.SECONDS.toMillis(1),
+        false
+    );
 
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     ChannelHandlerContext channelHandlerContext;
@@ -138,7 +151,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
         void simpleRead() throws Exception {
             // Given
             // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
             // When
             // we write this
             SUT.write(channelHandlerContext, msg, channelPromise);
@@ -169,7 +182,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
         void partialRead() throws Exception {
             // Given
             // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
             // When
             // we write this
             SUT.write(channelHandlerContext, msg, channelPromise);
@@ -202,7 +215,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
         void partialReadOneErrored() throws Exception {
             // Given
             // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
             // When
             // we write this
             SUT.write(channelHandlerContext, msg, channelPromise);
@@ -241,7 +254,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
         void noRead() throws Exception {
             // Given
             // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
             // When
             // we write this
             SUT.write(channelHandlerContext, msg, channelPromise);
@@ -314,7 +327,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
         @Test
         void read() throws Exception {
             // Given
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
             // When
             SUT.write(channelHandlerContext, msg, channelPromise);
             // Then
@@ -345,7 +358,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
         @Test
         void write() throws Exception {
             // Given
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(), responseCompletableFuture);
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture);
             // When
             SUT.write(channelHandlerContext, msg, channelPromise);
             // Then
@@ -395,48 +408,51 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
     }
 
     private static class TestDefaultPlcReadRequest extends DefaultPlcReadRequest {
-
-        private TestDefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
-            super(fields);
+        private TestDefaultPlcReadRequest(PlcReader reader, LinkedHashMap<String, PlcField> fields) {
+            super(reader, fields);
         }
 
-        private static TestDefaultPlcReadRequest build() {
+        private static TestDefaultPlcReadRequest build(PlcReader reader) {
             LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
             IntStream.rangeClosed(1, 5).forEach(i -> fields.put("readField" + i, mock(PlcField.class)));
-            return new TestDefaultPlcReadRequest(fields);
+            return new TestDefaultPlcReadRequest(reader, fields);
         }
     }
 
     private static class TestDefaultPlcWriteRequest extends DefaultPlcWriteRequest {
 
-        private TestDefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
-            super(fields);
+        private TestDefaultPlcWriteRequest(PlcWriter writer, LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+            super(writer, fields);
         }
 
-        private static TestDefaultPlcWriteRequest build() {
+        private static TestDefaultPlcWriteRequest build(PlcWriter writer) {
             LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
             IntStream.rangeClosed(1, 5).forEach(i -> fields.put("writeField" + i, Pair.of(mock(PlcField.class), mock(FieldItem.class))));
-            return new TestDefaultPlcWriteRequest(fields);
+            return new TestDefaultPlcWriteRequest(writer, fields);
         }
     }
 
     private static class TestDefaultPlcSubscriptionRequest extends DefaultPlcSubscriptionRequest {
 
-        private static TestDefaultPlcSubscriptionRequest build() {
+        private TestDefaultPlcSubscriptionRequest(PlcSubscriber subscriber) {
+            super(subscriber);
+        }
+
+        private static TestDefaultPlcSubscriptionRequest build(PlcSubscriber subscriber) {
             // TODO: implement me once available
-            return new TestDefaultPlcSubscriptionRequest();
+            return new TestDefaultPlcSubscriptionRequest(subscriber);
         }
     }
 
     private static class TestDefaultPlcUnsubscriptionRequest extends DefaultPlcUnsubscriptionRequest {
 
-        private TestDefaultPlcUnsubscriptionRequest(Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
-            super(internalPlcSubscriptionHandles);
+        private TestDefaultPlcUnsubscriptionRequest(PlcSubscriber subscriber, Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+            super(subscriber, internalPlcSubscriptionHandles);
         }
 
-        private static TestDefaultPlcUnsubscriptionRequest build() {
+        private static TestDefaultPlcUnsubscriptionRequest build(PlcSubscriber subscriber) {
             // TODO: implement me once available
-            return new TestDefaultPlcUnsubscriptionRequest(Collections.emptyList());
+            return new TestDefaultPlcUnsubscriptionRequest(subscriber, Collections.emptyList());
         }
     }
 }
\ No newline at end of file