You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/10/29 08:09:46 UTC

[incubator-plc4x] branch master updated (19d445f -> f257a6d)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


    from 19d445f  - Fixing sonar issues
     new 6fc11d9  [driver-bases] re-enabled unsubscription support on SingleItemToSingleRequestProtocol
     new f257a6d  [driver-bases] SingleItemToSingleRequestProtocol: added SplitConfig for granular config of item splitting.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/ads/connection/AdsTcpPlcConnection.java   |   4 +-
 .../messages/DefaultPlcUnsubscriptionResponse.java |   8 +-
 .../InternalPlcUnsubscriptionResponse.java         |   2 +-
 .../SingleItemToSingleRequestProtocol.java         | 152 ++++++++++++++--
 .../SingleItemToSingleRequestProtocolTest.java     | 200 ++++++++++++++++++++-
 5 files changed, 350 insertions(+), 16 deletions(-)


[incubator-plc4x] 01/02: [driver-bases] re-enabled unsubscription support on SingleItemToSingleRequestProtocol

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 6fc11d9d8cb4794c05f998965869080ab07b7571
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Oct 29 07:54:08 2018 +0100

    [driver-bases] re-enabled unsubscription support on SingleItemToSingleRequestProtocol
---
 .../java/ads/connection/AdsTcpPlcConnection.java   |  2 +-
 .../messages/DefaultPlcUnsubscriptionResponse.java |  8 +++-
 .../InternalPlcUnsubscriptionResponse.java         |  2 +-
 .../SingleItemToSingleRequestProtocol.java         | 53 +++++++++++++++++++++-
 .../SingleItemToSingleRequestProtocolTest.java     | 41 ++++++++++++++++-
 5 files changed, 99 insertions(+), 7 deletions(-)

diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index cbd5a5d..5d10a01 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -255,7 +255,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
             }
         }
         CompletableFuture<PlcUnsubscriptionResponse> future = new CompletableFuture<>();
-        future.complete(new DefaultPlcUnsubscriptionResponse());
+        future.complete(new DefaultPlcUnsubscriptionResponse(internalPlcUnsubscriptionRequest));
         return future;
     }
 
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
index e3f855a..c26ac85 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
@@ -22,9 +22,15 @@ import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
 
 public class DefaultPlcUnsubscriptionResponse implements InternalPlcUnsubscriptionResponse {
 
+    private final InternalPlcUnsubscriptionRequest request;
+
+    public DefaultPlcUnsubscriptionResponse(InternalPlcUnsubscriptionRequest request) {
+        this.request = request;
+    }
+
     @Override
     public PlcUnsubscriptionRequest getRequest() {
-        return null;
+        return request;
     }
 
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java
index a857cb6..8529d55 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java
@@ -20,6 +20,6 @@ package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
 
-public interface InternalPlcUnsubscriptionResponse extends PlcUnsubscriptionResponse {
+public interface InternalPlcUnsubscriptionResponse extends PlcUnsubscriptionResponse, InternalPlcResponse {
 
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 22c210e..a6f1471 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -32,6 +32,7 @@ import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.messages.*;
 import org.apache.plc4x.java.base.messages.items.BaseDefaultFieldItem;
+import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
 import org.apache.plc4x.java.base.model.SubscriptionPlcField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ import java.util.stream.Collectors;
 /**
  * This layer can be used to split a {@link org.apache.plc4x.java.api.messages.PlcRequest} which addresses multiple {@link PlcField}s into multiple subsequent {@link org.apache.plc4x.java.api.messages.PlcRequest}s.
  */
+// TODO: add split config so we can override special requests that are allready splitted downstream
 public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
@@ -217,6 +219,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                     .forEach(stringPairMap -> stringPairMap.forEach(fields::put));
 
                 plcResponse = new DefaultPlcSubscriptionResponse(internalPlcSubscriptionRequest, fields);
+            } else if (originalPlcRequestContainer.getRequest() instanceof InternalPlcUnsubscriptionRequest) {
+                InternalPlcUnsubscriptionRequest internalPlcUnsubscriptionRequest = (InternalPlcUnsubscriptionRequest) originalPlcRequestContainer.getRequest();
+                plcResponse = new DefaultPlcUnsubscriptionResponse(internalPlcUnsubscriptionRequest);
             } else {
                 errored(currentTdpu, new PlcProtocolException("Unknown type detected " + originalPlcRequestContainer.getRequest().getClass()), originalResponseFuture);
                 return;
@@ -327,7 +332,6 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         }
                         promiseCombiner.add((Future) subPromise);
                     });
-                    // TODO: add sub/unsub
                 } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest) {
                     InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) internalPlcFieldRequest;
                     internalPlcSubscriptionRequest.getNamedSubscriptionFields().forEach(field -> {
@@ -353,10 +357,34 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         }
                         promiseCombiner.add((Future) subPromise);
                     });
-                    // TODO: add sub/unsub
                 } else {
                     throw new PlcProtocolException("Unmapped request type " + request.getClass());
                 }
+            } else if (request instanceof InternalPlcUnsubscriptionRequest) {
+                InternalPlcUnsubscriptionRequest internalPlcUnsubscriptionRequest = (InternalPlcUnsubscriptionRequest) request;
+                internalPlcUnsubscriptionRequest.getInternalPlcSubscriptionHandles().forEach(handle -> {
+                    ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+                    Integer tdpu = correlationIdGenerator.getAndIncrement();
+                    CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>();
+                    // Important: don't chain to above as we want the above to be completed not the result of when complete
+                    correlatedCompletableFuture
+                        .thenApply(InternalPlcResponse.class::cast)
+                        .whenComplete((internalPlcResponse, throwable) -> {
+                            if (throwable != null) {
+                                errored(tdpu, throwable, in.getResponseFuture());
+                            } else {
+                                tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
+                            }
+                        });
+                    PlcRequestContainer<CorrelatedPlcUnsubscriptionRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcUnsubscriptionRequest.of(subscriber, handle, tdpu), correlatedCompletableFuture);
+                    correlationToParentContainer.put(tdpu, in);
+                    queue.add(correlatedPlcRequestContainer, subPromise);
+                    if (!tdpus.add(tdpu)) {
+                        throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                    }
+                    promiseCombiner.add((Future) subPromise);
+                });
             } else {
                 ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
                 queue.add(msg, subPromise);
@@ -492,6 +520,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
         }
     }
 
+    protected static class CorrelatedPlcUnsubscriptionRequest extends DefaultPlcUnsubscriptionRequest implements CorrelatedPlcRequest {
+
+        protected final int tdpu;
+
+        protected CorrelatedPlcUnsubscriptionRequest(PlcSubscriber subscriber, LinkedList<InternalPlcSubscriptionHandle> subscriptionHandles, int tdpu) {
+            super(subscriber, subscriptionHandles);
+            this.tdpu = tdpu;
+        }
+
+        protected static CorrelatedPlcUnsubscriptionRequest of(PlcSubscriber subscriber, InternalPlcSubscriptionHandle subscriptionHandle, int tdpu) {
+            LinkedList<InternalPlcSubscriptionHandle> list = new LinkedList<>();
+            list.add(subscriptionHandle);
+            return new CorrelatedPlcUnsubscriptionRequest(subscriber, list, tdpu);
+        }
+
+        @Override
+        public int getTdpu() {
+            return tdpu;
+        }
+    }
+
     // TODO: maybe export to jmx
     public Map<String, Number> getStatistics() {
         HashMap<String, Number> statistics = new HashMap<>();
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index fb85d2b..f3a3592 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -344,8 +344,34 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
             }
 
             @Test
-            void simpleUnsubscribe() {
-                // TODO: implement me
+            void simpleUnsubscribe() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // And
+                // and we simulate that all get responded
+                verify(channelHandlerContext, times(3)).write(plcRequestContainerArgumentCaptor.capture(), any());
+                List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+                capturedDownstreamContainers.forEach(this::produceUnsubscriptionResponse);
+                // Then
+                // our complete container should complete normally
+                verify(responseCompletableFuture).complete(any());
+                // And we should have no memory leak
+                assertThat(SUT.getStatistics()).containsOnly(
+                    entry("queue", 0),
+                    entry("sentButUnacknowledgedSubContainer", 0),
+                    entry("correlationToParentContainer", 0),
+                    entry("containerCorrelationIdMap", 0),
+                    entry("responsesToBeDelivered", 0),
+                    entry("correlationIdGenerator", 3),
+                    entry("erroredItems", 0L),
+                    entry("deliveredItems", 3L),
+                    entry("deliveredContainers", 1L),
+                    entry("erroredContainers", 0L)
+                );
             }
 
             @SuppressWarnings("unchecked")
@@ -359,6 +385,17 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
                 responseFuture.complete(new DefaultPlcSubscriptionResponse(request, responseFields));
                 return null;
             }
+
+            @SuppressWarnings("unchecked")
+            private Void produceUnsubscriptionResponse(PlcRequestContainer plcRequestContainer) {
+                InternalPlcUnsubscriptionRequest request = (InternalPlcUnsubscriptionRequest) plcRequestContainer.getRequest();
+                // TODO: we need a response for every item
+                InternalPlcSubscriptionHandle internalPlcSubscriptionHandle = request.getInternalPlcSubscriptionHandles().iterator().next();
+                // TODO: handles ignored for now.
+                CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
+                responseFuture.complete(new DefaultPlcUnsubscriptionResponse(request));
+                return null;
+            }
         }
     }
 


[incubator-plc4x] 02/02: [driver-bases] SingleItemToSingleRequestProtocol: added SplitConfig for granular config of item splitting.

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit f257a6d24a132a1f5a3995a2a5e5a2ce8c7ed5b8
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Oct 29 08:27:07 2018 +0100

    [driver-bases] SingleItemToSingleRequestProtocol: added SplitConfig for
    granular config of item splitting.
---
 .../java/ads/connection/AdsTcpPlcConnection.java   |   2 +-
 .../SingleItemToSingleRequestProtocol.java         | 101 +++++++++++--
 .../SingleItemToSingleRequestProtocolTest.java     | 159 +++++++++++++++++++++
 3 files changed, 252 insertions(+), 10 deletions(-)

diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 5d10a01..0f74c44 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -117,7 +117,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
                 pipeline.addLast(new Payload2TcpProtocol());
                 pipeline.addLast(new Ads2PayloadProtocol());
                 pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping));
-                pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer));
+                pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer, SingleItemToSingleRequestProtocol.SplitConfig.builder().dontSplitSubscribe().dontSplitUnsubscribe().build()));
             }
         };
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index a6f1471..cbd5b1a 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -91,20 +91,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     private AtomicLong erroredItems;
 
+    private SplitConfig splitConfig;
+
     public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer) {
-        this(reader, writer, subscriber, timer, true);
+        this(reader, writer, subscriber, timer, new SplitConfig());
+    }
+
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, SplitConfig splitConfig) {
+        this(reader, writer, subscriber, timer, splitConfig, true);
     }
 
-    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, boolean betterImplementationPossible) {
-        this(reader, writer, subscriber, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, SplitConfig splitConfig, boolean betterImplementationPossible) {
+        this(reader, writer, subscriber, timer, TimeUnit.SECONDS.toMillis(30), splitConfig, betterImplementationPossible);
     }
 
-    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, long defaultReceiveTimeout, SplitConfig splitConfig, boolean betterImplementationPossible) {
         this.reader = reader;
         this.writer = writer;
         this.subscriber = subscriber;
         this.timer = timer;
         this.defaultReceiveTimeout = defaultReceiveTimeout;
+        this.splitConfig = splitConfig;
         if (betterImplementationPossible) {
             String callStack = Arrays.stream(Thread.currentThread().getStackTrace())
                 .map(StackTraceElement::toString)
@@ -281,10 +288,10 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
             // Create a promise that has to be called multiple times.
             PromiseCombiner promiseCombiner = new PromiseCombiner();
             InternalPlcRequest request = in.getRequest();
-            if (request instanceof InternalPlcFieldRequest) {
+            if (request instanceof InternalPlcFieldRequest && (splitConfig.splitRead || splitConfig.splitWrite || splitConfig.splitSubscription)) {
                 InternalPlcFieldRequest internalPlcFieldRequest = (InternalPlcFieldRequest) request;
 
-                if (internalPlcFieldRequest instanceof InternalPlcReadRequest) {
+                if (internalPlcFieldRequest instanceof InternalPlcReadRequest && splitConfig.splitRead) {
                     InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) internalPlcFieldRequest;
                     internalPlcReadRequest.getNamedFields().forEach(field -> {
                         ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -309,7 +316,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         }
                         promiseCombiner.add((Future) subPromise);
                     });
-                } else if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) {
+                } else if (internalPlcFieldRequest instanceof InternalPlcWriteRequest && splitConfig.splitWrite) {
                     InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest;
                     internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
                         ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -332,7 +339,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         }
                         promiseCombiner.add((Future) subPromise);
                     });
-                } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest) {
+                } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest && splitConfig.splitSubscription) {
                     InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) internalPlcFieldRequest;
                     internalPlcSubscriptionRequest.getNamedSubscriptionFields().forEach(field -> {
                         ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -360,7 +367,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                 } else {
                     throw new PlcProtocolException("Unmapped request type " + request.getClass());
                 }
-            } else if (request instanceof InternalPlcUnsubscriptionRequest) {
+            } else if (request instanceof InternalPlcUnsubscriptionRequest && splitConfig.splitUnsubscription) {
                 InternalPlcUnsubscriptionRequest internalPlcUnsubscriptionRequest = (InternalPlcUnsubscriptionRequest) request;
                 internalPlcUnsubscriptionRequest.getInternalPlcSubscriptionHandles().forEach(handle -> {
                     ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -556,4 +563,80 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
         statistics.put("erroredContainers", erroredContainers.get());
         return statistics;
     }
+
+    public static class SplitConfig {
+        private final boolean splitRead;
+        private final boolean splitWrite;
+        private final boolean splitSubscription;
+        private final boolean splitUnsubscription;
+
+        public SplitConfig() {
+            splitRead = true;
+            splitWrite = true;
+            splitSubscription = true;
+            splitUnsubscription = true;
+        }
+
+        private SplitConfig(boolean splitRead, boolean splitWrite, boolean splitSubscription, boolean splitUnsubscription) {
+            this.splitRead = splitRead;
+            this.splitWrite = splitWrite;
+            this.splitSubscription = splitSubscription;
+            this.splitUnsubscription = splitUnsubscription;
+        }
+
+        public static SplitConfigBuilder builder() {
+            return new SplitConfigBuilder();
+        }
+
+        public static class SplitConfigBuilder {
+            private boolean splitRead = true;
+            private boolean splitWrite = true;
+            private boolean splitSubscription = true;
+            private boolean splitUnsubscription = true;
+
+            public SplitConfigBuilder splitRead() {
+                splitRead = true;
+                return this;
+            }
+
+            public SplitConfigBuilder dontSplitRead() {
+                splitRead = false;
+                return this;
+            }
+
+            public SplitConfigBuilder splitWrite() {
+                splitWrite = true;
+                return this;
+            }
+
+            public SplitConfigBuilder dontSplitWrite() {
+                splitWrite = false;
+                return this;
+            }
+
+            public SplitConfigBuilder splitSubscribe() {
+                splitSubscription = true;
+                return this;
+            }
+
+            public SplitConfigBuilder dontSplitSubscribe() {
+                splitSubscription = false;
+                return this;
+            }
+
+            public SplitConfigBuilder splitUnsubscribe() {
+                splitUnsubscription = true;
+                return this;
+            }
+
+            public SplitConfigBuilder dontSplitUnsubscribe() {
+                splitUnsubscription = false;
+                return this;
+            }
+
+            public SplitConfig build() {
+                return new SplitConfig(splitRead, splitWrite, splitSubscription, splitUnsubscription);
+            }
+        }
+    }
 }
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index f3a3592..cd1216c 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -83,6 +83,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
             mockSubscriber,
             new HashedWheelTimer(),
             TimeUnit.SECONDS.toMillis(1),
+            new SingleItemToSingleRequestProtocol.SplitConfig(),
             false
         );
         SUT.channelRegistered(channelHandlerContext);
@@ -149,6 +150,164 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
     }
 
     @Nested
+    class SplitConfig {
+
+        @Nested
+        class SplitOn {
+            @BeforeEach
+            void setUp() throws Exception {
+                // We setup the SUT with a special configuration
+                SUT = new SingleItemToSingleRequestProtocol(
+                    mockReader,
+                    mockWriter,
+                    mockSubscriber,
+                    new HashedWheelTimer(),
+                    TimeUnit.SECONDS.toMillis(1),
+                    SingleItemToSingleRequestProtocol.SplitConfig.builder()
+                        .dontSplitRead()
+                        .dontSplitWrite()
+                        .dontSplitSubscribe()
+                        .dontSplitUnsubscribe()
+                        .build(),
+                    false
+                );
+                SUT.channelRegistered(channelHandlerContext);
+                when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
+            }
+
+            @Test
+            void read() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(1)).write(eq(msg), any());
+            }
+
+            @Test
+            void write() throws Exception {
+                // Given
+                // we have a simple write
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(1)).write(eq(msg), any());
+            }
+
+            @Test
+            void subscribe() throws Exception {
+                // Given
+                // we have a simple subscribe
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcSubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(1)).write(eq(msg), any());
+            }
+
+            @Test
+            void unsubsribe() throws Exception {
+                // Given
+                // we have a simple unsubscribe
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(1)).write(eq(msg), any());
+            }
+        }
+
+        @Nested
+        class SplitOff {
+            @BeforeEach
+            void setUp() throws Exception {
+                // We setup the SUT with a special configuration
+                SUT = new SingleItemToSingleRequestProtocol(
+                    mockReader,
+                    mockWriter,
+                    mockSubscriber,
+                    new HashedWheelTimer(),
+                    TimeUnit.SECONDS.toMillis(1),
+                    SingleItemToSingleRequestProtocol.SplitConfig.builder()
+                        .splitRead()
+                        .splitWrite()
+                        .splitSubscribe()
+                        .splitUnsubscribe()
+                        .build(),
+                    false
+                );
+                SUT.channelRegistered(channelHandlerContext);
+                when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
+            }
+
+            @Test
+            void read() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(5)).write(any(), any());
+            }
+
+            @Test
+            void write() throws Exception {
+                // Given
+                // we have a simple write
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(5)).write(any(), any());
+            }
+
+            @Test
+            void subscribe() throws Exception {
+                // Given
+                // we have a simple subscribe
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcSubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(3)).write(any(), any());
+            }
+
+            @Test
+            void unsubsribe() throws Exception {
+                // Given
+                // we have a simple unsubscribe
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(3)).write(any(), any());
+            }
+        }
+
+
+    }
+
+    @Nested
     class Roundtrip {
         @Nested
         class Read {