You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/09/28 10:32:02 UTC

[incubator-plc4x] branch master updated (dbd84f4 -> 00dd870)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


    from dbd84f4  [General] SingleItemToSingleRequestProtocol added one more test case and simplified code a bit.
     new d9f9e54  [General] SingleItemToSingleRequestProtocol BugFixes/refinement/tests
     new 1acd5f5  [General] Added TODO about broken PUB/SUB implementation in PLC4J
     new bd2bb36  [General] Added missing interface extensions to subscription API (plc4j)
     new 5507a05  [General] SingleItemToSingleRequestProtocol added some guards against unknown types and added TODO regarding subscriptions
     new 00dd870  [General] SingleItemToSingleRequestProtocol added note about wrongly instantiated timer

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../messages/DefaultPlcSubscriptionRequest.java    |  1 +
 .../messages/DefaultPlcUnsubscriptionRequest.java  |  1 +
 .../messages/InternalPlcSubscriptionRequest.java   |  2 +-
 .../messages/InternalPlcUnsubscriptionRequest.java |  2 +-
 .../SingleItemToSingleRequestProtocol.java         | 60 +++++++++-----
 .../SingleItemToSingleRequestProtocolTest.java     | 96 ++++++++++++++++++----
 6 files changed, 124 insertions(+), 38 deletions(-)


[incubator-plc4x] 03/05: [General] Added missing interface extensions to subscription API (plc4j)

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit bd2bb362be3ede5484d47fa61fce7acdfcafbe4d
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Sep 28 12:24:49 2018 +0200

    [General] Added missing interface extensions to subscription API (plc4j)
---
 .../apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java | 2 +-
 .../plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java      | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
index 7cd2167..3d4ca98 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
@@ -21,7 +21,7 @@ package org.apache.plc4x.java.base.messages;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.api.types.PlcSubscriptionType;
 
-public interface InternalPlcSubscriptionRequest extends PlcSubscriptionRequest {
+public interface InternalPlcSubscriptionRequest extends PlcSubscriptionRequest, InternalPlcFieldRequest {
 
     PlcSubscriptionType getPlcSubscriptionType();
 
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java
index 18b1ce5..7a8c3db 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java
@@ -23,7 +23,7 @@ import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
 
 import java.util.Collection;
 
-public interface InternalPlcUnsubscriptionRequest extends PlcUnsubscriptionRequest {
+public interface InternalPlcUnsubscriptionRequest extends PlcUnsubscriptionRequest, InternalPlcFieldRequest {
 
     Collection<? extends InternalPlcSubscriptionHandle> getInternalPlcSubscriptionHandles();
 }


[incubator-plc4x] 02/05: [General] Added TODO about broken PUB/SUB implementation in PLC4J

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 1acd5f5c4910f3899a231dafb71e00ad5aede371
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Sep 28 12:05:51 2018 +0200

    [General] Added TODO about broken PUB/SUB implementation in PLC4J
---
 .../apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java   | 1 +
 .../apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java | 1 +
 2 files changed, 2 insertions(+)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index 9f59cb3..3194cf5 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -30,6 +30,7 @@ import java.time.Duration;
 import java.util.*;
 import java.util.function.BiFunction;
 
+// TODO: request broken needs finishing.
 public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionRequest, InternalPlcFieldRequest {
 
     @Override
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
index 5dbccea..03e1f3e 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
@@ -27,6 +27,7 @@ import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
 import java.util.*;
 import java.util.stream.Collectors;
 
+// TODO: request broken needs finishing.
 public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptionRequest, InternalPlcFieldRequest {
 
     private final Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles;


[incubator-plc4x] 04/05: [General] SingleItemToSingleRequestProtocol added some guards against unknown types and added TODO regarding subscriptions

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 5507a0580278666ef92fe6acceb9ccbe2e21247b
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Sep 28 12:26:56 2018 +0200

    [General] SingleItemToSingleRequestProtocol added some guards against
    unknown types and added TODO regarding subscriptions
---
 .../SingleItemToSingleRequestProtocol.java         | 11 ++++---
 .../SingleItemToSingleRequestProtocolTest.java     | 36 +++++++++++++++++++---
 2 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 3eaad6f..36fadfe 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -26,7 +26,7 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.PromiseCombiner;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcTimeoutException;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
@@ -199,7 +199,8 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
                 plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values);
             } else {
-                throw new PlcRuntimeException("Unknown type detected " + originalPlcRequestContainer.getRequest());
+                errored(currentTdpu, new PlcProtocolException("Unknown type detected " + originalPlcRequestContainer.getRequest().getClass()), originalResponseFuture);
+                return;
             }
             responsesToBeDelivered.remove(originalPlcRequestContainer);
             containerCorrelationIdMap.remove(originalPlcRequestContainer);
@@ -284,8 +285,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         }
                         promiseCombiner.add((Future) subPromise);
                     });
-                }
-                if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) {
+                } else if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) {
                     InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest;
                     internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
                         ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -308,6 +308,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         }
                         promiseCombiner.add((Future) subPromise);
                     });
+                    // TODO: add sub/unsub
+                } else {
+                    throw new PlcProtocolException("Unmapped request type " + request.getClass());
                 }
             } else {
                 ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index a75a86e..c6bec89 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -29,6 +29,7 @@ import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.messages.*;
 import org.apache.plc4x.java.base.messages.items.FieldItem;
+import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
 import org.assertj.core.api.WithAssertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -38,10 +39,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.*;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -375,6 +373,16 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
         }
 
         @Test
+        void subscribe() throws Exception {
+            // TODO: implement once available
+        }
+
+        @Test
+        void unsubcribe() throws Exception {
+            // TODO: implement once available
+        }
+
+        @Test
         void trySendingMessages() throws Exception {
             PendingWriteQueue queue = (PendingWriteQueue) FieldUtils.getField(SUT.getClass(), "queue", true).get(SUT);
             assertThat(queue.size()).isLessThanOrEqualTo(0);
@@ -410,4 +418,24 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
             return new TestDefaultPlcWriteRequest(fields);
         }
     }
+
+    private static class TestDefaultPlcSubscriptionRequest extends DefaultPlcSubscriptionRequest {
+
+        private static TestDefaultPlcSubscriptionRequest build() {
+            // TODO: implement me once available
+            return new TestDefaultPlcSubscriptionRequest();
+        }
+    }
+
+    private static class TestDefaultPlcUnsubscriptionRequest extends DefaultPlcUnsubscriptionRequest {
+
+        private TestDefaultPlcUnsubscriptionRequest(Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+            super(internalPlcSubscriptionHandles);
+        }
+
+        private static TestDefaultPlcUnsubscriptionRequest build() {
+            // TODO: implement me once available
+            return new TestDefaultPlcUnsubscriptionRequest(Collections.emptyList());
+        }
+    }
 }
\ No newline at end of file


[incubator-plc4x] 05/05: [General] SingleItemToSingleRequestProtocol added note about wrongly instantiated timer

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 00dd8704bde102bc5e6545eb0c714aba0edddcc7
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Sep 28 12:31:53 2018 +0200

    [General] SingleItemToSingleRequestProtocol added note about wrongly
    instantiated timer
---
 .../plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 36fadfe..4d961c6 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -104,6 +104,12 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         this.queue = new PendingWriteQueue(ctx);
+        /*
+         * TODO: this needs to be supplied globally
+         * from {@link HashedWheelTimer}:
+         * One of the common mistakes, that makes
+         * your application unresponsive, is to create a new instance for every connection
+         */
         this.timer = new HashedWheelTimer();
         this.scheduledTimeouts = new ConcurrentHashMap<>();
         this.sentButUnacknowledgedSubContainer = new ConcurrentHashMap<>();


[incubator-plc4x] 01/05: [General] SingleItemToSingleRequestProtocol BugFixes/refinement/tests

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit d9f9e543c1726a812f589548846039c5503a99b3
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Sep 28 11:54:19 2018 +0200

    [General] SingleItemToSingleRequestProtocol BugFixes/refinement/tests
---
 .../SingleItemToSingleRequestProtocol.java         | 43 ++++++++++------
 .../SingleItemToSingleRequestProtocolTest.java     | 60 ++++++++++++++++++----
 2 files changed, 75 insertions(+), 28 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index bdd1a7f..3eaad6f 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -36,10 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -64,13 +61,15 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedSubContainer;
 
     // Map to map tdpu to original parent container
+    // TODO: currently this could be supplied via param, only reason to keep would be for statistics.
     private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> correlationToParentContainer;
 
     // Map to track tdpus per container
+    // TODO: currently this could be supplied via param, only reason to keep would be for statistics.
     private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
 
     // Map to track a list of responses per parent container
-    private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDelivered;
+    private ConcurrentMap<PlcRequestContainer<?, ?>, Queue<InternalPlcResponse<?>>> responsesToBeDelivered;
 
     private AtomicInteger correlationIdGenerator;
 
@@ -158,19 +157,19 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     // Decoding
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-    protected void tryFinish(Integer correlationId, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+    protected void tryFinish(Integer currentTdpu, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
         deliveredItems.incrementAndGet();
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
         LOGGER.info("{} got acknowledged", subPlcRequestContainer);
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(correlationId);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
         if (originalPlcRequestContainer == null) {
             LOGGER.warn("Unrelated package received {}", msg);
             return;
         }
-        List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new LinkedList<>());
+        Queue<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new ConcurrentLinkedQueue<>());
         correlatedResponseItems.add(msg);
         Set<Integer> integers = containerCorrelationIdMap.get(originalPlcRequestContainer);
-        integers.remove(correlationId);
+        integers.remove(currentTdpu);
         if (integers.isEmpty()) {
             deliveredContainers.incrementAndGet();
             Timeout timeout = scheduledTimeouts.remove(originalPlcRequestContainer);
@@ -208,15 +207,15 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
         }
     }
 
-    protected void errored(int correlationId, Throwable throwable, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+    protected void errored(Integer currentTdpu, Throwable throwable, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
         erroredItems.incrementAndGet();
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
         LOGGER.info("{} got errored", subPlcRequestContainer);
 
 
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(correlationId);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
         if (originalPlcRequestContainer == null) {
-            LOGGER.warn("Unrelated error received correlationId:{}", correlationId, throwable);
+            LOGGER.warn("Unrelated error received tdpu:{}", currentTdpu, throwable);
         } else {
             erroredContainers.incrementAndGet();
             Timeout timeout = scheduledTimeouts.remove(originalPlcRequestContainer);
@@ -224,8 +223,18 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                 timeout.cancel();
             }
             responsesToBeDelivered.remove(originalPlcRequestContainer);
-            containerCorrelationIdMap.remove(originalPlcRequestContainer);
-            LOGGER.warn("PlcRequestContainer {} and correlationId {} failed ", correlationToParentContainer, correlationId, throwable);
+
+            Set<Integer> tdpus = containerCorrelationIdMap.remove(originalPlcRequestContainer);
+            if (tdpus != null) {
+                tdpus.forEach(tdpu -> {
+                    // TODO: technically the other items didn't error so do we increment?
+                    //erroredItems.incrementAndGet();
+                    sentButUnacknowledgedSubContainer.remove(tdpu);
+                    correlationToParentContainer.remove(tdpu);
+                });
+            }
+
+            LOGGER.warn("PlcRequestContainer {} and correlationId {} failed ", correlationToParentContainer, currentTdpu, throwable);
             originalResponseFuture.completeExceptionally(throwable);
         }
     }
@@ -239,7 +248,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
         if (msg instanceof PlcRequestContainer) {
             @SuppressWarnings("unchecked")
             PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg;
-            Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>());
+            Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> ConcurrentHashMap.newKeySet());
 
             Timeout timeout = timer.newTimeout(timeout_ -> handleTimeout(timeout_, in, tdpus, System.nanoTime()), defaultReceiveTimeout, TimeUnit.MILLISECONDS);
             scheduledTimeouts.put(in, timeout);
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index 045cbff..a75a86e 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -21,6 +21,8 @@ package org.apache.plc4x.java.base.protocol;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.PendingWriteQueue;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcFieldRequest;
 import org.apache.plc4x.java.api.model.PlcField;
@@ -43,6 +45,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
@@ -197,6 +200,45 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
         }
 
         @Test
+        void partialReadOneErrored() throws Exception {
+            // Given
+            // we have a simple read
+            PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            // When
+            // we write this
+            SUT.write(channelHandlerContext, msg, channelPromise);
+            // And
+            // and we simulate that some one responded
+            verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+            List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+            capturedDownstreamContainers.stream()
+                .findFirst()
+                .map(plcRequestContainer ->
+                    plcRequestContainer
+                        .getResponseFuture()
+                        .completeExceptionally(new RuntimeException("ErrorOccurred"))
+                );
+            // Then
+            // We create SUT with 1 seconds timeout
+            TimeUnit.SECONDS.sleep(2);
+            // our complete container should complete normally
+            verify(responseCompletableFuture).completeExceptionally(any());
+            // And we should have no memory leak
+            assertThat(SUT.getStatistics()).containsOnly(
+                entry("queue", 0),
+                entry("sentButUnacknowledgedSubContainer", 0),
+                entry("correlationToParentContainer", 0),
+                entry("containerCorrelationIdMap", 0),
+                entry("responsesToBeDelivered", 0),
+                entry("correlationIdGenerator", 5),
+                entry("deliveredItems", 0L),
+                entry("erroredItems", 1L),
+                entry("deliveredContainers", 0L),
+                entry("erroredContainers", 1L)
+            );
+        }
+
+        @Test
         void noRead() throws Exception {
             // Given
             // we have a simple read
@@ -334,8 +376,12 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
 
         @Test
         void trySendingMessages() throws Exception {
+            PendingWriteQueue queue = (PendingWriteQueue) FieldUtils.getField(SUT.getClass(), "queue", true).get(SUT);
+            assertThat(queue.size()).isLessThanOrEqualTo(0);
+            queue.add(mock(PlcRequestContainer.class), channelPromise);
+            assertThat(queue.size()).isGreaterThan(0);
             SUT.trySendingMessages(channelHandlerContext);
-            // TODO: add assertions
+            assertThat(queue.size()).isLessThanOrEqualTo(0);
         }
     }
 
@@ -347,11 +393,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
 
         private static TestDefaultPlcReadRequest build() {
             LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
-            fields.put("readField1", mock(PlcField.class));
-            fields.put("readField2", mock(PlcField.class));
-            fields.put("readField3", mock(PlcField.class));
-            fields.put("readField4", mock(PlcField.class));
-            fields.put("readField5", mock(PlcField.class));
+            IntStream.rangeClosed(1, 5).forEach(i -> fields.put("readField" + i, mock(PlcField.class)));
             return new TestDefaultPlcReadRequest(fields);
         }
     }
@@ -364,11 +406,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
 
         private static TestDefaultPlcWriteRequest build() {
             LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
-            fields.put("writeField1", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
-            fields.put("writeField2", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
-            fields.put("writeField3", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
-            fields.put("writeField4", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
-            fields.put("writeField5", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+            IntStream.rangeClosed(1, 5).forEach(i -> fields.put("writeField" + i, Pair.of(mock(PlcField.class), mock(FieldItem.class))));
             return new TestDefaultPlcWriteRequest(fields);
         }
     }