You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/10/27 12:43:14 UTC

[incubator-plc4x] branch master updated: [driver-bases] re-enabled subscription support on SingleItemToSingleRequestProtocol un-sub still needs to be re-enabling

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git


The following commit(s) were added to refs/heads/master by this push:
     new 342fb75  [driver-bases] re-enabled subscription support on SingleItemToSingleRequestProtocol un-sub still needs to be re-enabling
342fb75 is described below

commit 342fb753674310df157046ed5b253b560e50f81c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Sat Oct 27 14:40:40 2018 +0200

    [driver-bases] re-enabled subscription support on SingleItemToSingleRequestProtocol
    un-sub still needs to be re-enabling
---
 .../ads/connection/AdsSerialPlcConnection.java     |   5 +-
 .../java/ads/connection/AdsTcpPlcConnection.java   |   2 +-
 .../messages/DefaultPlcSubscriptionRequest.java    |  10 +-
 .../messages/DefaultPlcSubscriptionResponse.java   |   4 +
 .../base/messages/InternalPlcFieldRequest.java     |   2 +-
 .../messages/InternalPlcSubscriptionRequest.java   |   3 +
 .../messages/InternalPlcSubscriptionResponse.java  |   7 +-
 .../SingleItemToSingleRequestProtocol.java         |  73 +++-
 .../SingleItemToSingleRequestProtocolTest.java     | 393 ++++++++++++---------
 .../modbus/connection/ModbusTcpPlcConnection.java  |   5 +-
 10 files changed, 329 insertions(+), 175 deletions(-)

diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
index 3d4c8ba..d9094fa 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
@@ -28,12 +28,9 @@ import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
 import org.apache.plc4x.java.ads.protocol.Payload2SerialProtocol;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
 import org.apache.plc4x.java.ads.protocol.util.SingleMessageRateLimiter;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
 import org.apache.plc4x.java.base.connection.SerialChannelFactory;
 import org.apache.plc4x.java.base.protocol.SingleItemToSingleRequestProtocol;
 
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class AdsSerialPlcConnection extends AdsAbstractPlcConnection {
@@ -65,7 +62,7 @@ public class AdsSerialPlcConnection extends AdsAbstractPlcConnection {
                 pipeline.addLast(new SingleMessageRateLimiter());
                 pipeline.addLast(new Ads2PayloadProtocol());
                 pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping));
-                pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsSerialPlcConnection.this, AdsSerialPlcConnection.this, timer));
+                pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsSerialPlcConnection.this, AdsSerialPlcConnection.this, null, timer));
             }
         };
     }
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 06aa579..cbd5a5d 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -117,7 +117,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
                 pipeline.addLast(new Payload2TcpProtocol());
                 pipeline.addLast(new Ads2PayloadProtocol());
                 pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping));
-                pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer));
+                pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer));
             }
         };
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index 599e4ad..13392ae 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -85,7 +85,15 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
     public LinkedList<Pair<String, PlcField>> getNamedFields() {
         return fields.entrySet()
             .stream()
-            .map(stringPlcFieldEntry -> Pair.of(stringPlcFieldEntry.getKey(), (PlcField) stringPlcFieldEntry.getValue()))
+            .map(stringPlcFieldEntry -> Pair.of(stringPlcFieldEntry.getKey(), stringPlcFieldEntry.getValue().getPlcField()))
+            .collect(Collectors.toCollection(LinkedList::new));
+    }
+
+    @Override
+    public LinkedList<Pair<String, SubscriptionPlcField>> getNamedSubscriptionFields() {
+        return fields.entrySet()
+            .stream()
+            .map(stringPlcFieldEntry -> Pair.of(stringPlcFieldEntry.getKey(), stringPlcFieldEntry.getValue()))
             .collect(Collectors.toCollection(LinkedList::new));
     }
 
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
index 47a5ce2..3a2a0cb 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
@@ -82,4 +82,8 @@ public class DefaultPlcSubscriptionResponse implements InternalPlcSubscriptionRe
         return values.values().stream().map(Pair::getValue).collect(Collectors.toList());
     }
 
+    @Override
+    public Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> getValues() {
+        return values;
+    }
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
index a7ee30c..30c440e 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
@@ -24,7 +24,7 @@ import org.apache.plc4x.java.api.model.PlcField;
 
 import java.util.LinkedList;
 
-public interface InternalPlcFieldRequest extends PlcFieldRequest {
+public interface InternalPlcFieldRequest extends PlcFieldRequest, InternalPlcRequest {
 
     LinkedList<Pair<String, PlcField>> getNamedFields();
 
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
index 6dcb30d..8901fef 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.plc4x.java.base.messages;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.base.model.SubscriptionPlcField;
 
@@ -29,4 +30,6 @@ public interface InternalPlcSubscriptionRequest extends PlcSubscriptionRequest,
     LinkedList<SubscriptionPlcField> getSubscriptionFields();
 
     LinkedHashMap<String, SubscriptionPlcField> getSubscriptionPlcFieldMap();
+
+    LinkedList<Pair<String, SubscriptionPlcField>> getNamedSubscriptionFields();
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionResponse.java
index 86e19d6..561a99a 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionResponse.java
@@ -18,8 +18,13 @@
  */
 package org.apache.plc4x.java.base.messages;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
 
-public interface InternalPlcSubscriptionResponse extends PlcSubscriptionResponse {
+import java.util.Map;
 
+public interface InternalPlcSubscriptionResponse extends PlcSubscriptionResponse, InternalPlcResponse {
+    Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> getValues();
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 7ef5bdc..22c210e 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -28,9 +28,11 @@ import org.apache.commons.lang3.tuple.Triple;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcTimeoutException;
 import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.messages.*;
 import org.apache.plc4x.java.base.messages.items.BaseDefaultFieldItem;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,8 +52,11 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     private final Timer timer;
 
     private final PlcReader reader;
+
     private final PlcWriter writer;
 
+    private final PlcSubscriber subscriber;
+
     // TODO: maybe better get from map
     private long defaultReceiveTimeout;
 
@@ -84,17 +89,18 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     private AtomicLong erroredItems;
 
-    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer) {
-        this(reader, writer, timer, true);
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer) {
+        this(reader, writer, subscriber, timer, true);
     }
 
-    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer, boolean betterImplementationPossible) {
-        this(reader, writer, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, boolean betterImplementationPossible) {
+        this(reader, writer, subscriber, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
     }
 
-    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
         this.reader = reader;
         this.writer = writer;
+        this.subscriber = subscriber;
         this.timer = timer;
         this.defaultReceiveTimeout = defaultReceiveTimeout;
         if (betterImplementationPossible) {
@@ -201,6 +207,16 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                     .forEach(stringPairMap -> stringPairMap.forEach(values::put));
 
                 plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values);
+            } else if (originalPlcRequestContainer.getRequest() instanceof InternalPlcSubscriptionRequest) {
+                InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) originalPlcRequestContainer.getRequest();
+                HashMap<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> fields = new HashMap<>();
+
+                correlatedResponseItems.stream()
+                    .map(InternalPlcSubscriptionResponse.class::cast)
+                    .map(InternalPlcSubscriptionResponse::getValues)
+                    .forEach(stringPairMap -> stringPairMap.forEach(fields::put));
+
+                plcResponse = new DefaultPlcSubscriptionResponse(internalPlcSubscriptionRequest, fields);
             } else {
                 errored(currentTdpu, new PlcProtocolException("Unknown type detected " + originalPlcRequestContainer.getRequest().getClass()), originalResponseFuture);
                 return;
@@ -312,6 +328,32 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         promiseCombiner.add((Future) subPromise);
                     });
                     // TODO: add sub/unsub
+                } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest) {
+                    InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) internalPlcFieldRequest;
+                    internalPlcSubscriptionRequest.getNamedSubscriptionFields().forEach(field -> {
+                        ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+                        Integer tdpu = correlationIdGenerator.getAndIncrement();
+                        CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>();
+                        // Important: don't chain to above as we want the above to be completed not the result of when complete
+                        correlatedCompletableFuture
+                            .thenApply(InternalPlcResponse.class::cast)
+                            .whenComplete((internalPlcResponse, throwable) -> {
+                                if (throwable != null) {
+                                    errored(tdpu, throwable, in.getResponseFuture());
+                                } else {
+                                    tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
+                                }
+                            });
+                        PlcRequestContainer<CorrelatedPlcSubscriptionRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcSubscriptionRequest.of(subscriber, field, tdpu), correlatedCompletableFuture);
+                        correlationToParentContainer.put(tdpu, in);
+                        queue.add(correlatedPlcRequestContainer, subPromise);
+                        if (!tdpus.add(tdpu)) {
+                            throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                        }
+                        promiseCombiner.add((Future) subPromise);
+                    });
+                    // TODO: add sub/unsub
                 } else {
                     throw new PlcProtocolException("Unmapped request type " + request.getClass());
                 }
@@ -429,6 +471,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
         }
     }
 
+    protected static class CorrelatedPlcSubscriptionRequest extends DefaultPlcSubscriptionRequest implements CorrelatedPlcRequest {
+
+        protected final int tdpu;
+
+        protected CorrelatedPlcSubscriptionRequest(PlcSubscriber subscriber, LinkedHashMap<String, SubscriptionPlcField> fields, int tdpu) {
+            super(subscriber, fields);
+            this.tdpu = tdpu;
+        }
+
+        protected static CorrelatedPlcSubscriptionRequest of(PlcSubscriber subscriber, Pair<String, SubscriptionPlcField> stringPlcFieldPair, int tdpu) {
+            LinkedHashMap<String, SubscriptionPlcField> fields = new LinkedHashMap<>();
+            fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
+            return new CorrelatedPlcSubscriptionRequest(subscriber, fields, tdpu);
+        }
+
+        @Override
+        public int getTdpu() {
+            return tdpu;
+        }
+    }
+
     // TODO: maybe export to jmx
     public Map<String, Number> getStatistics() {
         HashMap<String, Number> statistics = new HashMap<>();
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index ebf266f..fb85d2b 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcFieldRequest;
 import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.messages.*;
 import org.apache.plc4x.java.base.messages.items.BaseDefaultFieldItem;
@@ -38,7 +39,10 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.*;
+import org.mockito.Answers;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.*;
@@ -53,18 +57,7 @@ import static org.mockito.Mockito.*;
 @ExtendWith(MockitoExtension.class)
 class SingleItemToSingleRequestProtocolTest implements WithAssertions {
 
-    PlcReader mockReader = null;
-    PlcWriter mockWriter = null;
-    PlcSubscriber mockSubscriber = null;
-
-    @InjectMocks
-    SingleItemToSingleRequestProtocol SUT = new SingleItemToSingleRequestProtocol(
-        mockReader,
-        mockWriter,
-        new HashedWheelTimer(),
-        TimeUnit.SECONDS.toMillis(1),
-        false
-    );
+    SingleItemToSingleRequestProtocol SUT;
 
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     ChannelHandlerContext channelHandlerContext;
@@ -75,8 +68,23 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
     @Mock
     CompletableFuture<InternalPlcResponse> responseCompletableFuture;
 
+    @Mock
+    PlcReader mockReader;
+    @Mock
+    PlcWriter mockWriter;
+    @Mock
+    PlcSubscriber mockSubscriber;
+
     @BeforeEach
     void setUp() throws Exception {
+        SUT = new SingleItemToSingleRequestProtocol(
+            mockReader,
+            mockWriter,
+            mockSubscriber,
+            new HashedWheelTimer(),
+            TimeUnit.SECONDS.toMillis(1),
+            false
+        );
         SUT.channelRegistered(channelHandlerContext);
         when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
     }
@@ -142,152 +150,215 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
 
     @Nested
     class Roundtrip {
-        @Captor
-        ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
-
-        @Test
-        void simpleRead() throws Exception {
-            // Given
-            // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
-            // When
-            // we write this
-            SUT.write(channelHandlerContext, msg, channelPromise);
-            // And
-            // and we simulate that all get responded
-            verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
-            List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
-            capturedDownstreamContainers.forEach(this::produceReadResponse);
-            // Then
-            // our complete container should complete normally
-            verify(responseCompletableFuture).complete(any());
-            // And we should have no memory leak
-            assertThat(SUT.getStatistics()).containsOnly(
-                entry("queue", 0),
-                entry("sentButUnacknowledgedSubContainer", 0),
-                entry("correlationToParentContainer", 0),
-                entry("containerCorrelationIdMap", 0),
-                entry("responsesToBeDelivered", 0),
-                entry("correlationIdGenerator", 5),
-                entry("erroredItems", 0L),
-                entry("deliveredItems", 5L),
-                entry("deliveredContainers", 1L),
-                entry("erroredContainers", 0L)
-            );
-        }
-
-        @Test
-        void partialRead() throws Exception {
-            // Given
-            // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
-            // When
-            // we write this
-            SUT.write(channelHandlerContext, msg, channelPromise);
-            // And
-            // and we simulate that some one responded
-            verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
-            List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
-            capturedDownstreamContainers.stream().findFirst().map(this::produceReadResponse);
-            // Then
-            // We create SUT with 1 seconds timeout
-            TimeUnit.SECONDS.sleep(2);
-            // our complete container should complete normally
-            verify(responseCompletableFuture).completeExceptionally(any());
-            // And we should have no memory leak
-            assertThat(SUT.getStatistics()).containsOnly(
-                entry("queue", 0),
-                entry("sentButUnacknowledgedSubContainer", 0),
-                entry("correlationToParentContainer", 0),
-                entry("containerCorrelationIdMap", 0),
-                entry("responsesToBeDelivered", 0),
-                entry("correlationIdGenerator", 5),
-                entry("deliveredItems", 1L),
-                entry("erroredItems", 4L),
-                entry("deliveredContainers", 0L),
-                entry("erroredContainers", 1L)
-            );
-        }
-
-        @Test
-        void partialReadOneErrored() throws Exception {
-            // Given
-            // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
-            // When
-            // we write this
-            SUT.write(channelHandlerContext, msg, channelPromise);
-            // And
-            // and we simulate that some one responded
-            verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
-            List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
-            capturedDownstreamContainers.stream()
-                .findFirst()
-                .map(plcRequestContainer ->
-                    plcRequestContainer
-                        .getResponseFuture()
-                        .completeExceptionally(new RuntimeException("ErrorOccurred"))
+        @Nested
+        class Read {
+            @Captor
+            ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
+
+            @Test
+            void simpleRead() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // And
+                // and we simulate that all get responded
+                verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+                List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+                capturedDownstreamContainers.forEach(this::produceReadResponse);
+                // Then
+                // our complete container should complete normally
+                verify(responseCompletableFuture).complete(any());
+                // And we should have no memory leak
+                assertThat(SUT.getStatistics()).containsOnly(
+                    entry("queue", 0),
+                    entry("sentButUnacknowledgedSubContainer", 0),
+                    entry("correlationToParentContainer", 0),
+                    entry("containerCorrelationIdMap", 0),
+                    entry("responsesToBeDelivered", 0),
+                    entry("correlationIdGenerator", 5),
+                    entry("erroredItems", 0L),
+                    entry("deliveredItems", 5L),
+                    entry("deliveredContainers", 1L),
+                    entry("erroredContainers", 0L)
                 );
-            // Then
-            // We create SUT with 1 seconds timeout
-            TimeUnit.SECONDS.sleep(2);
-            // our complete container should complete normally
-            verify(responseCompletableFuture).completeExceptionally(any());
-            // And we should have no memory leak
-            assertThat(SUT.getStatistics()).containsOnly(
-                entry("queue", 0),
-                entry("sentButUnacknowledgedSubContainer", 0),
-                entry("correlationToParentContainer", 0),
-                entry("containerCorrelationIdMap", 0),
-                entry("responsesToBeDelivered", 0),
-                entry("correlationIdGenerator", 5),
-                entry("deliveredItems", 0L),
-                entry("erroredItems", 1L),
-                entry("deliveredContainers", 0L),
-                entry("erroredContainers", 1L)
-            );
+            }
+
+            @Test
+            void partialRead() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // And
+                // and we simulate that some one responded
+                verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+                List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+                capturedDownstreamContainers.stream().findFirst().map(this::produceReadResponse);
+                // Then
+                // We create SUT with 1 seconds timeout
+                TimeUnit.SECONDS.sleep(2);
+                // our complete container should complete normally
+                verify(responseCompletableFuture).completeExceptionally(any());
+                // And we should have no memory leak
+                assertThat(SUT.getStatistics()).containsOnly(
+                    entry("queue", 0),
+                    entry("sentButUnacknowledgedSubContainer", 0),
+                    entry("correlationToParentContainer", 0),
+                    entry("containerCorrelationIdMap", 0),
+                    entry("responsesToBeDelivered", 0),
+                    entry("correlationIdGenerator", 5),
+                    entry("deliveredItems", 1L),
+                    entry("erroredItems", 4L),
+                    entry("deliveredContainers", 0L),
+                    entry("erroredContainers", 1L)
+                );
+            }
+
+            @Test
+            void partialReadOneErrored() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // And
+                // and we simulate that some one responded
+                verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+                List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+                capturedDownstreamContainers.stream()
+                    .findFirst()
+                    .map(plcRequestContainer ->
+                        plcRequestContainer
+                            .getResponseFuture()
+                            .completeExceptionally(new RuntimeException("ErrorOccurred"))
+                    );
+                // Then
+                // We create SUT with 1 seconds timeout
+                TimeUnit.SECONDS.sleep(2);
+                // our complete container should complete normally
+                verify(responseCompletableFuture).completeExceptionally(any());
+                // And we should have no memory leak
+                assertThat(SUT.getStatistics()).containsOnly(
+                    entry("queue", 0),
+                    entry("sentButUnacknowledgedSubContainer", 0),
+                    entry("correlationToParentContainer", 0),
+                    entry("containerCorrelationIdMap", 0),
+                    entry("responsesToBeDelivered", 0),
+                    entry("correlationIdGenerator", 5),
+                    entry("deliveredItems", 0L),
+                    entry("erroredItems", 1L),
+                    entry("deliveredContainers", 0L),
+                    entry("erroredContainers", 1L)
+                );
+            }
+
+            @Test
+            void noRead() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // And
+                // and we simulate that some one responded
+                verify(channelHandlerContext, times(5)).write(any(), any());
+                // Then
+                // We create SUT with 1 seconds timeout
+                TimeUnit.SECONDS.sleep(2);
+                // our complete container should complete normally
+                verify(responseCompletableFuture).completeExceptionally(any());
+                // And we should have no memory leak
+                assertThat(SUT.getStatistics()).containsOnly(
+                    entry("queue", 0),
+                    entry("sentButUnacknowledgedSubContainer", 0),
+                    entry("correlationToParentContainer", 0),
+                    entry("containerCorrelationIdMap", 0),
+                    entry("responsesToBeDelivered", 0),
+                    entry("correlationIdGenerator", 5),
+                    entry("deliveredItems", 0L),
+                    entry("erroredItems", 5L),
+                    entry("deliveredContainers", 0L),
+                    entry("erroredContainers", 1L)
+                );
+            }
+
+            @SuppressWarnings("unchecked")
+            private Void produceReadResponse(PlcRequestContainer plcRequestContainer) {
+                InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+                // TODO: we need a response for every item
+                String fieldName = request.getFieldNames().iterator().next();
+                CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
+                HashMap<String, Pair<PlcResponseCode, BaseDefaultFieldItem>> responseFields = new HashMap<>();
+                responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(BaseDefaultFieldItem.class)));
+                responseFuture.complete(new DefaultPlcReadResponse(request, responseFields));
+                return null;
+            }
         }
 
-        @Test
-        void noRead() throws Exception {
-            // Given
-            // we have a simple read
-            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
-            // When
-            // we write this
-            SUT.write(channelHandlerContext, msg, channelPromise);
-            // And
-            // and we simulate that some one responded
-            verify(channelHandlerContext, times(5)).write(any(), any());
-            // Then
-            // We create SUT with 1 seconds timeout
-            TimeUnit.SECONDS.sleep(2);
-            // our complete container should complete normally
-            verify(responseCompletableFuture).completeExceptionally(any());
-            // And we should have no memory leak
-            assertThat(SUT.getStatistics()).containsOnly(
-                entry("queue", 0),
-                entry("sentButUnacknowledgedSubContainer", 0),
-                entry("correlationToParentContainer", 0),
-                entry("containerCorrelationIdMap", 0),
-                entry("responsesToBeDelivered", 0),
-                entry("correlationIdGenerator", 5),
-                entry("deliveredItems", 0L),
-                entry("erroredItems", 5L),
-                entry("deliveredContainers", 0L),
-                entry("erroredContainers", 1L)
-            );
+        @Nested
+        class Write {
+            // TODO: implement me
         }
 
-        @SuppressWarnings("unchecked")
-        private Void produceReadResponse(PlcRequestContainer plcRequestContainer) {
-            InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest();
-            String fieldName = request.getFieldNames().iterator().next();
-            CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
-            HashMap<String, Pair<PlcResponseCode, BaseDefaultFieldItem>> responseFields = new HashMap<>();
-            responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(BaseDefaultFieldItem.class)));
-            responseFuture.complete(new DefaultPlcReadResponse(request, responseFields));
-            return null;
+        @Nested
+        class Subscribe {
+            @Captor
+            ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
+
+            @Test
+            void simpleSubscribe() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcSubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // And
+                // and we simulate that all get responded
+                verify(channelHandlerContext, times(3)).write(plcRequestContainerArgumentCaptor.capture(), any());
+                List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+                capturedDownstreamContainers.forEach(this::produceSubscriptionResponse);
+                // Then
+                // our complete container should complete normally
+                verify(responseCompletableFuture).complete(any());
+                // And we should have no memory leak
+                assertThat(SUT.getStatistics()).containsOnly(
+                    entry("queue", 0),
+                    entry("sentButUnacknowledgedSubContainer", 0),
+                    entry("correlationToParentContainer", 0),
+                    entry("containerCorrelationIdMap", 0),
+                    entry("responsesToBeDelivered", 0),
+                    entry("correlationIdGenerator", 3),
+                    entry("erroredItems", 0L),
+                    entry("deliveredItems", 3L),
+                    entry("deliveredContainers", 1L),
+                    entry("erroredContainers", 0L)
+                );
+            }
+
+            @Test
+            void simpleUnsubscribe() {
+                // TODO: implement me
+            }
+
+            @SuppressWarnings("unchecked")
+            private Void produceSubscriptionResponse(PlcRequestContainer plcRequestContainer) {
+                InternalPlcSubscriptionRequest request = (InternalPlcSubscriptionRequest) plcRequestContainer.getRequest();
+                // TODO: we need a response for every item
+                String fieldName = request.getFieldNames().iterator().next();
+                CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
+                HashMap<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> responseFields = new HashMap<>();
+                responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(PlcSubscriptionHandle.class)));
+                responseFuture.complete(new DefaultPlcSubscriptionResponse(request, responseFields));
+                return null;
+            }
         }
     }
 
@@ -437,8 +508,11 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
         }
 
         private static TestDefaultPlcSubscriptionRequest build(PlcSubscriber subscriber) {
-            // TODO: implement me once available
-            return new TestDefaultPlcSubscriptionRequest(subscriber, new LinkedHashMap<>());
+            LinkedHashMap<String, SubscriptionPlcField> fields = new LinkedHashMap<>();
+            fields.put("sub1", mock(SubscriptionPlcField.class));
+            fields.put("sub2", mock(SubscriptionPlcField.class));
+            fields.put("sub3", mock(SubscriptionPlcField.class));
+            return new TestDefaultPlcSubscriptionRequest(subscriber, fields);
         }
     }
 
@@ -449,8 +523,11 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
         }
 
         private static TestDefaultPlcUnsubscriptionRequest build(PlcSubscriber subscriber) {
-            // TODO: implement me once available
-            return new TestDefaultPlcUnsubscriptionRequest(subscriber, Collections.emptyList());
+            List<InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles = new LinkedList<>();
+            internalPlcSubscriptionHandles.add(mock(InternalPlcSubscriptionHandle.class));
+            internalPlcSubscriptionHandles.add(mock(InternalPlcSubscriptionHandle.class));
+            internalPlcSubscriptionHandles.add(mock(InternalPlcSubscriptionHandle.class));
+            return new TestDefaultPlcUnsubscriptionRequest(subscriber, internalPlcSubscriptionHandles);
         }
     }
 }
\ No newline at end of file
diff --git a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java
index 8df399e..e2ff401 100644
--- a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java
+++ b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java
@@ -24,8 +24,6 @@ import com.digitalpetri.modbus.codec.ModbusTcpCodec;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
 import org.apache.plc4x.java.base.protocol.SingleItemToSingleRequestProtocol;
@@ -34,7 +32,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class ModbusTcpPlcConnection extends BaseModbusPlcConnection {
@@ -72,7 +69,7 @@ public class ModbusTcpPlcConnection extends BaseModbusPlcConnection {
             protected void initChannel(Channel channel) {
                 channel.pipeline().addLast(new ModbusTcpCodec(new ModbusRequestEncoder(), new ModbusResponseDecoder()));
                 channel.pipeline().addLast(new Plc4XModbusProtocol());
-                channel.pipeline().addLast(new SingleItemToSingleRequestProtocol(ModbusTcpPlcConnection.this, ModbusTcpPlcConnection.this, timer));
+                channel.pipeline().addLast(new SingleItemToSingleRequestProtocol(ModbusTcpPlcConnection.this, ModbusTcpPlcConnection.this, null, timer));
             }
         };
     }