You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/10/29 08:09:48 UTC
[incubator-plc4x] 02/02: [driver-bases]
SingleItemToSingleRequestProtocol: added SplitConfig for granular config of
item splitting.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit f257a6d24a132a1f5a3995a2a5e5a2ce8c7ed5b8
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Oct 29 08:27:07 2018 +0100
[driver-bases] SingleItemToSingleRequestProtocol: added SplitConfig for
granular config of item splitting.
---
.../java/ads/connection/AdsTcpPlcConnection.java | 2 +-
.../SingleItemToSingleRequestProtocol.java | 101 +++++++++++--
.../SingleItemToSingleRequestProtocolTest.java | 159 +++++++++++++++++++++
3 files changed, 252 insertions(+), 10 deletions(-)
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 5d10a01..0f74c44 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -117,7 +117,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
pipeline.addLast(new Payload2TcpProtocol());
pipeline.addLast(new Ads2PayloadProtocol());
pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping));
- pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer));
+ pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer, SingleItemToSingleRequestProtocol.SplitConfig.builder().dontSplitSubscribe().dontSplitUnsubscribe().build()));
}
};
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index a6f1471..cbd5b1a 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -91,20 +91,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
private AtomicLong erroredItems;
+ private SplitConfig splitConfig;
+
public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer) {
- this(reader, writer, subscriber, timer, true);
+ this(reader, writer, subscriber, timer, new SplitConfig());
+ }
+
+ public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, SplitConfig splitConfig) {
+ this(reader, writer, subscriber, timer, splitConfig, true);
}
- public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, boolean betterImplementationPossible) {
- this(reader, writer, subscriber, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
+ public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, SplitConfig splitConfig, boolean betterImplementationPossible) {
+ this(reader, writer, subscriber, timer, TimeUnit.SECONDS.toMillis(30), splitConfig, betterImplementationPossible);
}
- public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+ public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, long defaultReceiveTimeout, SplitConfig splitConfig, boolean betterImplementationPossible) {
this.reader = reader;
this.writer = writer;
this.subscriber = subscriber;
this.timer = timer;
this.defaultReceiveTimeout = defaultReceiveTimeout;
+ this.splitConfig = splitConfig;
if (betterImplementationPossible) {
String callStack = Arrays.stream(Thread.currentThread().getStackTrace())
.map(StackTraceElement::toString)
@@ -281,10 +288,10 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
// Create a promise that has to be called multiple times.
PromiseCombiner promiseCombiner = new PromiseCombiner();
InternalPlcRequest request = in.getRequest();
- if (request instanceof InternalPlcFieldRequest) {
+ if (request instanceof InternalPlcFieldRequest && (splitConfig.splitRead || splitConfig.splitWrite || splitConfig.splitSubscription)) {
InternalPlcFieldRequest internalPlcFieldRequest = (InternalPlcFieldRequest) request;
- if (internalPlcFieldRequest instanceof InternalPlcReadRequest) {
+ if (internalPlcFieldRequest instanceof InternalPlcReadRequest && splitConfig.splitRead) {
InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) internalPlcFieldRequest;
internalPlcReadRequest.getNamedFields().forEach(field -> {
ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -309,7 +316,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
}
promiseCombiner.add((Future) subPromise);
});
- } else if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) {
+ } else if (internalPlcFieldRequest instanceof InternalPlcWriteRequest && splitConfig.splitWrite) {
InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest;
internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -332,7 +339,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
}
promiseCombiner.add((Future) subPromise);
});
- } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest) {
+ } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest && splitConfig.splitSubscription) {
InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) internalPlcFieldRequest;
internalPlcSubscriptionRequest.getNamedSubscriptionFields().forEach(field -> {
ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -360,7 +367,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
} else {
throw new PlcProtocolException("Unmapped request type " + request.getClass());
}
- } else if (request instanceof InternalPlcUnsubscriptionRequest) {
+ } else if (request instanceof InternalPlcUnsubscriptionRequest && splitConfig.splitUnsubscription) {
InternalPlcUnsubscriptionRequest internalPlcUnsubscriptionRequest = (InternalPlcUnsubscriptionRequest) request;
internalPlcUnsubscriptionRequest.getInternalPlcSubscriptionHandles().forEach(handle -> {
ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
@@ -556,4 +563,80 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
statistics.put("erroredContainers", erroredContainers.get());
return statistics;
}
+
+ public static class SplitConfig {
+ private final boolean splitRead;
+ private final boolean splitWrite;
+ private final boolean splitSubscription;
+ private final boolean splitUnsubscription;
+
+ public SplitConfig() {
+ splitRead = true;
+ splitWrite = true;
+ splitSubscription = true;
+ splitUnsubscription = true;
+ }
+
+ private SplitConfig(boolean splitRead, boolean splitWrite, boolean splitSubscription, boolean splitUnsubscription) {
+ this.splitRead = splitRead;
+ this.splitWrite = splitWrite;
+ this.splitSubscription = splitSubscription;
+ this.splitUnsubscription = splitUnsubscription;
+ }
+
+ public static SplitConfigBuilder builder() {
+ return new SplitConfigBuilder();
+ }
+
+ public static class SplitConfigBuilder {
+ private boolean splitRead = true;
+ private boolean splitWrite = true;
+ private boolean splitSubscription = true;
+ private boolean splitUnsubscription = true;
+
+ public SplitConfigBuilder splitRead() {
+ splitRead = true;
+ return this;
+ }
+
+ public SplitConfigBuilder dontSplitRead() {
+ splitRead = false;
+ return this;
+ }
+
+ public SplitConfigBuilder splitWrite() {
+ splitWrite = true;
+ return this;
+ }
+
+ public SplitConfigBuilder dontSplitWrite() {
+ splitWrite = false;
+ return this;
+ }
+
+ public SplitConfigBuilder splitSubscribe() {
+ splitSubscription = true;
+ return this;
+ }
+
+ public SplitConfigBuilder dontSplitSubscribe() {
+ splitSubscription = false;
+ return this;
+ }
+
+ public SplitConfigBuilder splitUnsubscribe() {
+ splitUnsubscription = true;
+ return this;
+ }
+
+ public SplitConfigBuilder dontSplitUnsubscribe() {
+ splitUnsubscription = false;
+ return this;
+ }
+
+ public SplitConfig build() {
+ return new SplitConfig(splitRead, splitWrite, splitSubscription, splitUnsubscription);
+ }
+ }
+ }
}
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index f3a3592..cd1216c 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -83,6 +83,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
mockSubscriber,
new HashedWheelTimer(),
TimeUnit.SECONDS.toMillis(1),
+ new SingleItemToSingleRequestProtocol.SplitConfig(),
false
);
SUT.channelRegistered(channelHandlerContext);
@@ -149,6 +150,164 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
}
@Nested
+ class SplitConfig {
+
+ @Nested
+ class SplitOn {
+ @BeforeEach
+ void setUp() throws Exception {
+ // We setup the SUT with a special configuration
+ SUT = new SingleItemToSingleRequestProtocol(
+ mockReader,
+ mockWriter,
+ mockSubscriber,
+ new HashedWheelTimer(),
+ TimeUnit.SECONDS.toMillis(1),
+ SingleItemToSingleRequestProtocol.SplitConfig.builder()
+ .dontSplitRead()
+ .dontSplitWrite()
+ .dontSplitSubscribe()
+ .dontSplitUnsubscribe()
+ .build(),
+ false
+ );
+ SUT.channelRegistered(channelHandlerContext);
+ when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
+ }
+
+ @Test
+ void read() throws Exception {
+ // Given
+ // we have a simple read
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // then
+ // we should invoke this only one time
+ verify(channelHandlerContext, times(1)).write(eq(msg), any());
+ }
+
+ @Test
+ void write() throws Exception {
+ // Given
+ // we have a simple write
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // then
+ // we should invoke this only one time
+ verify(channelHandlerContext, times(1)).write(eq(msg), any());
+ }
+
+ @Test
+ void subscribe() throws Exception {
+ // Given
+ // we have a simple subscribe
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcSubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // then
+ // we should invoke this only one time
+ verify(channelHandlerContext, times(1)).write(eq(msg), any());
+ }
+
+ @Test
+ void unsubsribe() throws Exception {
+ // Given
+ // we have a simple unsubscribe
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // then
+ // we should invoke this only one time
+ verify(channelHandlerContext, times(1)).write(eq(msg), any());
+ }
+ }
+
+ @Nested
+ class SplitOff {
+ @BeforeEach
+ void setUp() throws Exception {
+ // We setup the SUT with a special configuration
+ SUT = new SingleItemToSingleRequestProtocol(
+ mockReader,
+ mockWriter,
+ mockSubscriber,
+ new HashedWheelTimer(),
+ TimeUnit.SECONDS.toMillis(1),
+ SingleItemToSingleRequestProtocol.SplitConfig.builder()
+ .splitRead()
+ .splitWrite()
+ .splitSubscribe()
+ .splitUnsubscribe()
+ .build(),
+ false
+ );
+ SUT.channelRegistered(channelHandlerContext);
+ when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
+ }
+
+ @Test
+ void read() throws Exception {
+ // Given
+ // we have a simple read
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // then
+ // we should invoke this only one time
+ verify(channelHandlerContext, times(5)).write(any(), any());
+ }
+
+ @Test
+ void write() throws Exception {
+ // Given
+ // we have a simple write
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // then
+ // we should invoke this only one time
+ verify(channelHandlerContext, times(5)).write(any(), any());
+ }
+
+ @Test
+ void subscribe() throws Exception {
+ // Given
+ // we have a simple subscribe
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcSubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // then
+ // we should invoke this only one time
+ verify(channelHandlerContext, times(3)).write(any(), any());
+ }
+
+ @Test
+ void unsubsribe() throws Exception {
+ // Given
+ // we have a simple unsubscribe
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // then
+ // we should invoke this only one time
+ verify(channelHandlerContext, times(3)).write(any(), any());
+ }
+ }
+
+
+ }
+
+ @Nested
class Roundtrip {
@Nested
class Read {