You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/10/29 08:09:48 UTC

[incubator-plc4x] 02/02: [driver-bases] SingleItemToSingleRequestProtocol: added SplitConfig for granular config of item splitting.

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit f257a6d24a132a1f5a3995a2a5e5a2ce8c7ed5b8
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Oct 29 08:27:07 2018 +0100

    [driver-bases] SingleItemToSingleRequestProtocol: added SplitConfig for
    granular config of item splitting.
---
 .../java/ads/connection/AdsTcpPlcConnection.java   |   2 +-
 .../SingleItemToSingleRequestProtocol.java         | 101 +++++++++++--
 .../SingleItemToSingleRequestProtocolTest.java     | 159 +++++++++++++++++++++
 3 files changed, 252 insertions(+), 10 deletions(-)

diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 5d10a01..0f74c44 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -117,7 +117,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
                 pipeline.addLast(new Payload2TcpProtocol());
                 pipeline.addLast(new Ads2PayloadProtocol());
                 pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping));
-                pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer));
+                pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer, SingleItemToSingleRequestProtocol.SplitConfig.builder().dontSplitSubscribe().dontSplitUnsubscribe().build()));
             }
         };
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index a6f1471..cbd5b1a 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -91,20 +91,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     private AtomicLong erroredItems;
 
+    private SplitConfig splitConfig;
+
     public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer) {
-        this(reader, writer, subscriber, timer, true);
+        this(reader, writer, subscriber, timer, new SplitConfig());
+    }
+
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, SplitConfig splitConfig) {
+        this(reader, writer, subscriber, timer, splitConfig, true);
     }
 
-    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, boolean betterImplementationPossible) {
-        this(reader, writer, subscriber, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, SplitConfig splitConfig, boolean betterImplementationPossible) {
+        this(reader, writer, subscriber, timer, TimeUnit.SECONDS.toMillis(30), splitConfig, betterImplementationPossible);
     }
 
-    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+    public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, long defaultReceiveTimeout, SplitConfig splitConfig, boolean betterImplementationPossible) {
         this.reader = reader;
         this.writer = writer;
         this.subscriber = subscriber;
         this.timer = timer;
         this.defaultReceiveTimeout = defaultReceiveTimeout;
+        this.splitConfig = splitConfig;
         if (betterImplementationPossible) {
             String callStack = Arrays.stream(Thread.currentThread().getStackTrace())
                 .map(StackTraceElement::toString)
@@ -281,10 +288,10 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
             // Create a promise that has to be called multiple times.
             PromiseCombiner promiseCombiner = new PromiseCombiner();
             InternalPlcRequest request = in.getRequest();
-            if (request instanceof InternalPlcFieldRequest) {
+            if (request instanceof InternalPlcFieldRequest && (splitConfig.splitRead || splitConfig.splitWrite || splitConfig.splitSubscription)) {
                 InternalPlcFieldRequest internalPlcFieldRequest = (InternalPlcFieldRequest) request;
 
-                if (internalPlcFieldRequest instanceof InternalPlcReadRequest) {
+                if (internalPlcFieldRequest instanceof InternalPlcReadRequest && splitConfig.splitRead) {
                     InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) internalPlcFieldRequest;
                     internalPlcReadRequest.getNamedFields().forEach(field -> {
                         ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -309,7 +316,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         }
                         promiseCombiner.add((Future) subPromise);
                     });
-                } else if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) {
+                } else if (internalPlcFieldRequest instanceof InternalPlcWriteRequest && splitConfig.splitWrite) {
                     InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest;
                     internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
                         ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -332,7 +339,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         }
                         promiseCombiner.add((Future) subPromise);
                     });
-                } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest) {
+                } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest && splitConfig.splitSubscription) {
                     InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) internalPlcFieldRequest;
                     internalPlcSubscriptionRequest.getNamedSubscriptionFields().forEach(field -> {
                         ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -360,7 +367,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                 } else {
                     throw new PlcProtocolException("Unmapped request type " + request.getClass());
                 }
-            } else if (request instanceof InternalPlcUnsubscriptionRequest) {
+            } else if (request instanceof InternalPlcUnsubscriptionRequest && splitConfig.splitUnsubscription) {
                 InternalPlcUnsubscriptionRequest internalPlcUnsubscriptionRequest = (InternalPlcUnsubscriptionRequest) request;
                 internalPlcUnsubscriptionRequest.getInternalPlcSubscriptionHandles().forEach(handle -> {
                     ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -556,4 +563,80 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
         statistics.put("erroredContainers", erroredContainers.get());
         return statistics;
     }
+
+    public static class SplitConfig {
+        private final boolean splitRead;
+        private final boolean splitWrite;
+        private final boolean splitSubscription;
+        private final boolean splitUnsubscription;
+
+        public SplitConfig() {
+            splitRead = true;
+            splitWrite = true;
+            splitSubscription = true;
+            splitUnsubscription = true;
+        }
+
+        private SplitConfig(boolean splitRead, boolean splitWrite, boolean splitSubscription, boolean splitUnsubscription) {
+            this.splitRead = splitRead;
+            this.splitWrite = splitWrite;
+            this.splitSubscription = splitSubscription;
+            this.splitUnsubscription = splitUnsubscription;
+        }
+
+        public static SplitConfigBuilder builder() {
+            return new SplitConfigBuilder();
+        }
+
+        public static class SplitConfigBuilder {
+            private boolean splitRead = true;
+            private boolean splitWrite = true;
+            private boolean splitSubscription = true;
+            private boolean splitUnsubscription = true;
+
+            public SplitConfigBuilder splitRead() {
+                splitRead = true;
+                return this;
+            }
+
+            public SplitConfigBuilder dontSplitRead() {
+                splitRead = false;
+                return this;
+            }
+
+            public SplitConfigBuilder splitWrite() {
+                splitWrite = true;
+                return this;
+            }
+
+            public SplitConfigBuilder dontSplitWrite() {
+                splitWrite = false;
+                return this;
+            }
+
+            public SplitConfigBuilder splitSubscribe() {
+                splitSubscription = true;
+                return this;
+            }
+
+            public SplitConfigBuilder dontSplitSubscribe() {
+                splitSubscription = false;
+                return this;
+            }
+
+            public SplitConfigBuilder splitUnsubscribe() {
+                splitUnsubscription = true;
+                return this;
+            }
+
+            public SplitConfigBuilder dontSplitUnsubscribe() {
+                splitUnsubscription = false;
+                return this;
+            }
+
+            public SplitConfig build() {
+                return new SplitConfig(splitRead, splitWrite, splitSubscription, splitUnsubscription);
+            }
+        }
+    }
 }
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index f3a3592..cd1216c 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -83,6 +83,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
             mockSubscriber,
             new HashedWheelTimer(),
             TimeUnit.SECONDS.toMillis(1),
+            new SingleItemToSingleRequestProtocol.SplitConfig(),
             false
         );
         SUT.channelRegistered(channelHandlerContext);
@@ -149,6 +150,164 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
     }
 
     @Nested
+    class SplitConfig {
+
+        @Nested
+        class SplitOn {
+            @BeforeEach
+            void setUp() throws Exception {
+                // We setup the SUT with a special configuration
+                SUT = new SingleItemToSingleRequestProtocol(
+                    mockReader,
+                    mockWriter,
+                    mockSubscriber,
+                    new HashedWheelTimer(),
+                    TimeUnit.SECONDS.toMillis(1),
+                    SingleItemToSingleRequestProtocol.SplitConfig.builder()
+                        .dontSplitRead()
+                        .dontSplitWrite()
+                        .dontSplitSubscribe()
+                        .dontSplitUnsubscribe()
+                        .build(),
+                    false
+                );
+                SUT.channelRegistered(channelHandlerContext);
+                when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
+            }
+
+            @Test
+            void read() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(1)).write(eq(msg), any());
+            }
+
+            @Test
+            void write() throws Exception {
+                // Given
+                // we have a simple write
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(1)).write(eq(msg), any());
+            }
+
+            @Test
+            void subscribe() throws Exception {
+                // Given
+                // we have a simple subscribe
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcSubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(1)).write(eq(msg), any());
+            }
+
+            @Test
+            void unsubsribe() throws Exception {
+                // Given
+                // we have a simple unsubscribe
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(1)).write(eq(msg), any());
+            }
+        }
+
+        @Nested
+        class SplitOff {
+            @BeforeEach
+            void setUp() throws Exception {
+                // We setup the SUT with a special configuration
+                SUT = new SingleItemToSingleRequestProtocol(
+                    mockReader,
+                    mockWriter,
+                    mockSubscriber,
+                    new HashedWheelTimer(),
+                    TimeUnit.SECONDS.toMillis(1),
+                    SingleItemToSingleRequestProtocol.SplitConfig.builder()
+                        .splitRead()
+                        .splitWrite()
+                        .splitSubscribe()
+                        .splitUnsubscribe()
+                        .build(),
+                    false
+                );
+                SUT.channelRegistered(channelHandlerContext);
+                when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
+            }
+
+            @Test
+            void read() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(5)).write(any(), any());
+            }
+
+            @Test
+            void write() throws Exception {
+                // Given
+                // we have a simple write
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(5)).write(any(), any());
+            }
+
+            @Test
+            void subscribe() throws Exception {
+                // Given
+                // we have a simple subscribe
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcSubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(3)).write(any(), any());
+            }
+
+            @Test
+            void unsubsribe() throws Exception {
+                // Given
+                // we have a simple unsubscribe
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // then
+                // we should invoke this only one time
+                verify(channelHandlerContext, times(3)).write(any(), any());
+            }
+        }
+
+
+    }
+
+    @Nested
     class Roundtrip {
         @Nested
         class Read {