You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/10/27 12:43:14 UTC
[incubator-plc4x] branch master updated: [driver-bases] re-enabled
subscription support on SingleItemToSingleRequestProtocol un-sub still
needs to be re-enabling
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push:
new 342fb75 [driver-bases] re-enabled subscription support on SingleItemToSingleRequestProtocol un-sub still needs to be re-enabling
342fb75 is described below
commit 342fb753674310df157046ed5b253b560e50f81c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Sat Oct 27 14:40:40 2018 +0200
[driver-bases] re-enabled subscription support on SingleItemToSingleRequestProtocol
un-sub still needs to be re-enabling
---
.../ads/connection/AdsSerialPlcConnection.java | 5 +-
.../java/ads/connection/AdsTcpPlcConnection.java | 2 +-
.../messages/DefaultPlcSubscriptionRequest.java | 10 +-
.../messages/DefaultPlcSubscriptionResponse.java | 4 +
.../base/messages/InternalPlcFieldRequest.java | 2 +-
.../messages/InternalPlcSubscriptionRequest.java | 3 +
.../messages/InternalPlcSubscriptionResponse.java | 7 +-
.../SingleItemToSingleRequestProtocol.java | 73 +++-
.../SingleItemToSingleRequestProtocolTest.java | 393 ++++++++++++---------
.../modbus/connection/ModbusTcpPlcConnection.java | 5 +-
10 files changed, 329 insertions(+), 175 deletions(-)
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
index 3d4c8ba..d9094fa 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
@@ -28,12 +28,9 @@ import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
import org.apache.plc4x.java.ads.protocol.Payload2SerialProtocol;
import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
import org.apache.plc4x.java.ads.protocol.util.SingleMessageRateLimiter;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
import org.apache.plc4x.java.base.connection.SerialChannelFactory;
import org.apache.plc4x.java.base.protocol.SingleItemToSingleRequestProtocol;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public class AdsSerialPlcConnection extends AdsAbstractPlcConnection {
@@ -65,7 +62,7 @@ public class AdsSerialPlcConnection extends AdsAbstractPlcConnection {
pipeline.addLast(new SingleMessageRateLimiter());
pipeline.addLast(new Ads2PayloadProtocol());
pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping));
- pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsSerialPlcConnection.this, AdsSerialPlcConnection.this, timer));
+ pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsSerialPlcConnection.this, AdsSerialPlcConnection.this, null, timer));
}
};
}
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 06aa579..cbd5a5d 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -117,7 +117,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
pipeline.addLast(new Payload2TcpProtocol());
pipeline.addLast(new Ads2PayloadProtocol());
pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping));
- pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer));
+ pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, AdsTcpPlcConnection.this, timer));
}
};
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index 599e4ad..13392ae 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -85,7 +85,15 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
public LinkedList<Pair<String, PlcField>> getNamedFields() {
return fields.entrySet()
.stream()
- .map(stringPlcFieldEntry -> Pair.of(stringPlcFieldEntry.getKey(), (PlcField) stringPlcFieldEntry.getValue()))
+ .map(stringPlcFieldEntry -> Pair.of(stringPlcFieldEntry.getKey(), stringPlcFieldEntry.getValue().getPlcField()))
+ .collect(Collectors.toCollection(LinkedList::new));
+ }
+
+ @Override
+ public LinkedList<Pair<String, SubscriptionPlcField>> getNamedSubscriptionFields() {
+ return fields.entrySet()
+ .stream()
+ .map(stringPlcFieldEntry -> Pair.of(stringPlcFieldEntry.getKey(), stringPlcFieldEntry.getValue()))
.collect(Collectors.toCollection(LinkedList::new));
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
index 47a5ce2..3a2a0cb 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
@@ -82,4 +82,8 @@ public class DefaultPlcSubscriptionResponse implements InternalPlcSubscriptionRe
return values.values().stream().map(Pair::getValue).collect(Collectors.toList());
}
+ @Override
+ public Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> getValues() {
+ return values;
+ }
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
index a7ee30c..30c440e 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
@@ -24,7 +24,7 @@ import org.apache.plc4x.java.api.model.PlcField;
import java.util.LinkedList;
-public interface InternalPlcFieldRequest extends PlcFieldRequest {
+public interface InternalPlcFieldRequest extends PlcFieldRequest, InternalPlcRequest {
LinkedList<Pair<String, PlcField>> getNamedFields();
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
index 6dcb30d..8901fef 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
@@ -18,6 +18,7 @@
*/
package org.apache.plc4x.java.base.messages;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.base.model.SubscriptionPlcField;
@@ -29,4 +30,6 @@ public interface InternalPlcSubscriptionRequest extends PlcSubscriptionRequest,
LinkedList<SubscriptionPlcField> getSubscriptionFields();
LinkedHashMap<String, SubscriptionPlcField> getSubscriptionPlcFieldMap();
+
+ LinkedList<Pair<String, SubscriptionPlcField>> getNamedSubscriptionFields();
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionResponse.java
index 86e19d6..561a99a 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionResponse.java
@@ -18,8 +18,13 @@
*/
package org.apache.plc4x.java.base.messages;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
-public interface InternalPlcSubscriptionResponse extends PlcSubscriptionResponse {
+import java.util.Map;
+public interface InternalPlcSubscriptionResponse extends PlcSubscriptionResponse, InternalPlcResponse {
+ Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> getValues();
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 7ef5bdc..22c210e 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -28,9 +28,11 @@ import org.apache.commons.lang3.tuple.Triple;
import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
import org.apache.plc4x.java.api.exceptions.PlcTimeoutException;
import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.base.messages.*;
import org.apache.plc4x.java.base.messages.items.BaseDefaultFieldItem;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,8 +52,11 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
private final Timer timer;
private final PlcReader reader;
+
private final PlcWriter writer;
+ private final PlcSubscriber subscriber;
+
// TODO: maybe better get from map
private long defaultReceiveTimeout;
@@ -84,17 +89,18 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
private AtomicLong erroredItems;
- public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer) {
- this(reader, writer, timer, true);
+ public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer) {
+ this(reader, writer, subscriber, timer, true);
}
- public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer, boolean betterImplementationPossible) {
- this(reader, writer, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
+ public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, boolean betterImplementationPossible) {
+ this(reader, writer, subscriber, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
}
- public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+ public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, PlcSubscriber subscriber, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
this.reader = reader;
this.writer = writer;
+ this.subscriber = subscriber;
this.timer = timer;
this.defaultReceiveTimeout = defaultReceiveTimeout;
if (betterImplementationPossible) {
@@ -201,6 +207,16 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
.forEach(stringPairMap -> stringPairMap.forEach(values::put));
plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values);
+ } else if (originalPlcRequestContainer.getRequest() instanceof InternalPlcSubscriptionRequest) {
+ InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) originalPlcRequestContainer.getRequest();
+ HashMap<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> fields = new HashMap<>();
+
+ correlatedResponseItems.stream()
+ .map(InternalPlcSubscriptionResponse.class::cast)
+ .map(InternalPlcSubscriptionResponse::getValues)
+ .forEach(stringPairMap -> stringPairMap.forEach(fields::put));
+
+ plcResponse = new DefaultPlcSubscriptionResponse(internalPlcSubscriptionRequest, fields);
} else {
errored(currentTdpu, new PlcProtocolException("Unknown type detected " + originalPlcRequestContainer.getRequest().getClass()), originalResponseFuture);
return;
@@ -312,6 +328,32 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
promiseCombiner.add((Future) subPromise);
});
// TODO: add sub/unsub
+ } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest) {
+ InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) internalPlcFieldRequest;
+ internalPlcSubscriptionRequest.getNamedSubscriptionFields().forEach(field -> {
+ ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+ Integer tdpu = correlationIdGenerator.getAndIncrement();
+ CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>();
+ // Important: don't chain to above as we want the above to be completed not the result of when complete
+ correlatedCompletableFuture
+ .thenApply(InternalPlcResponse.class::cast)
+ .whenComplete((internalPlcResponse, throwable) -> {
+ if (throwable != null) {
+ errored(tdpu, throwable, in.getResponseFuture());
+ } else {
+ tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
+ }
+ });
+ PlcRequestContainer<CorrelatedPlcSubscriptionRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcSubscriptionRequest.of(subscriber, field, tdpu), correlatedCompletableFuture);
+ correlationToParentContainer.put(tdpu, in);
+ queue.add(correlatedPlcRequestContainer, subPromise);
+ if (!tdpus.add(tdpu)) {
+ throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+ }
+ promiseCombiner.add((Future) subPromise);
+ });
+ // TODO: add sub/unsub
} else {
throw new PlcProtocolException("Unmapped request type " + request.getClass());
}
@@ -429,6 +471,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
}
}
+ protected static class CorrelatedPlcSubscriptionRequest extends DefaultPlcSubscriptionRequest implements CorrelatedPlcRequest {
+
+ protected final int tdpu;
+
+ protected CorrelatedPlcSubscriptionRequest(PlcSubscriber subscriber, LinkedHashMap<String, SubscriptionPlcField> fields, int tdpu) {
+ super(subscriber, fields);
+ this.tdpu = tdpu;
+ }
+
+ protected static CorrelatedPlcSubscriptionRequest of(PlcSubscriber subscriber, Pair<String, SubscriptionPlcField> stringPlcFieldPair, int tdpu) {
+ LinkedHashMap<String, SubscriptionPlcField> fields = new LinkedHashMap<>();
+ fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
+ return new CorrelatedPlcSubscriptionRequest(subscriber, fields, tdpu);
+ }
+
+ @Override
+ public int getTdpu() {
+ return tdpu;
+ }
+ }
+
// TODO: maybe export to jmx
public Map<String, Number> getStatistics() {
HashMap<String, Number> statistics = new HashMap<>();
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index ebf266f..fb85d2b 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.messages.PlcFieldRequest;
import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.base.messages.*;
import org.apache.plc4x.java.base.messages.items.BaseDefaultFieldItem;
@@ -38,7 +39,10 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.*;
+import org.mockito.Answers;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.*;
@@ -53,18 +57,7 @@ import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class SingleItemToSingleRequestProtocolTest implements WithAssertions {
- PlcReader mockReader = null;
- PlcWriter mockWriter = null;
- PlcSubscriber mockSubscriber = null;
-
- @InjectMocks
- SingleItemToSingleRequestProtocol SUT = new SingleItemToSingleRequestProtocol(
- mockReader,
- mockWriter,
- new HashedWheelTimer(),
- TimeUnit.SECONDS.toMillis(1),
- false
- );
+ SingleItemToSingleRequestProtocol SUT;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
ChannelHandlerContext channelHandlerContext;
@@ -75,8 +68,23 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
@Mock
CompletableFuture<InternalPlcResponse> responseCompletableFuture;
+ @Mock
+ PlcReader mockReader;
+ @Mock
+ PlcWriter mockWriter;
+ @Mock
+ PlcSubscriber mockSubscriber;
+
@BeforeEach
void setUp() throws Exception {
+ SUT = new SingleItemToSingleRequestProtocol(
+ mockReader,
+ mockWriter,
+ mockSubscriber,
+ new HashedWheelTimer(),
+ TimeUnit.SECONDS.toMillis(1),
+ false
+ );
SUT.channelRegistered(channelHandlerContext);
when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
}
@@ -142,152 +150,215 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
@Nested
class Roundtrip {
- @Captor
- ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
-
- @Test
- void simpleRead() throws Exception {
- // Given
- // we have a simple read
- PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
- // When
- // we write this
- SUT.write(channelHandlerContext, msg, channelPromise);
- // And
- // and we simulate that all get responded
- verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
- List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
- capturedDownstreamContainers.forEach(this::produceReadResponse);
- // Then
- // our complete container should complete normally
- verify(responseCompletableFuture).complete(any());
- // And we should have no memory leak
- assertThat(SUT.getStatistics()).containsOnly(
- entry("queue", 0),
- entry("sentButUnacknowledgedSubContainer", 0),
- entry("correlationToParentContainer", 0),
- entry("containerCorrelationIdMap", 0),
- entry("responsesToBeDelivered", 0),
- entry("correlationIdGenerator", 5),
- entry("erroredItems", 0L),
- entry("deliveredItems", 5L),
- entry("deliveredContainers", 1L),
- entry("erroredContainers", 0L)
- );
- }
-
- @Test
- void partialRead() throws Exception {
- // Given
- // we have a simple read
- PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
- // When
- // we write this
- SUT.write(channelHandlerContext, msg, channelPromise);
- // And
- // and we simulate that some one responded
- verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
- List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
- capturedDownstreamContainers.stream().findFirst().map(this::produceReadResponse);
- // Then
- // We create SUT with 1 seconds timeout
- TimeUnit.SECONDS.sleep(2);
- // our complete container should complete normally
- verify(responseCompletableFuture).completeExceptionally(any());
- // And we should have no memory leak
- assertThat(SUT.getStatistics()).containsOnly(
- entry("queue", 0),
- entry("sentButUnacknowledgedSubContainer", 0),
- entry("correlationToParentContainer", 0),
- entry("containerCorrelationIdMap", 0),
- entry("responsesToBeDelivered", 0),
- entry("correlationIdGenerator", 5),
- entry("deliveredItems", 1L),
- entry("erroredItems", 4L),
- entry("deliveredContainers", 0L),
- entry("erroredContainers", 1L)
- );
- }
-
- @Test
- void partialReadOneErrored() throws Exception {
- // Given
- // we have a simple read
- PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
- // When
- // we write this
- SUT.write(channelHandlerContext, msg, channelPromise);
- // And
- // and we simulate that some one responded
- verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
- List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
- capturedDownstreamContainers.stream()
- .findFirst()
- .map(plcRequestContainer ->
- plcRequestContainer
- .getResponseFuture()
- .completeExceptionally(new RuntimeException("ErrorOccurred"))
+ @Nested
+ class Read {
+ @Captor
+ ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
+
+ @Test
+ void simpleRead() throws Exception {
+ // Given
+ // we have a simple read
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // And
+ // and we simulate that all get responded
+ verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+ List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+ capturedDownstreamContainers.forEach(this::produceReadResponse);
+ // Then
+ // our complete container should complete normally
+ verify(responseCompletableFuture).complete(any());
+ // And we should have no memory leak
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("correlationIdGenerator", 5),
+ entry("erroredItems", 0L),
+ entry("deliveredItems", 5L),
+ entry("deliveredContainers", 1L),
+ entry("erroredContainers", 0L)
);
- // Then
- // We create SUT with 1 seconds timeout
- TimeUnit.SECONDS.sleep(2);
- // our complete container should complete normally
- verify(responseCompletableFuture).completeExceptionally(any());
- // And we should have no memory leak
- assertThat(SUT.getStatistics()).containsOnly(
- entry("queue", 0),
- entry("sentButUnacknowledgedSubContainer", 0),
- entry("correlationToParentContainer", 0),
- entry("containerCorrelationIdMap", 0),
- entry("responsesToBeDelivered", 0),
- entry("correlationIdGenerator", 5),
- entry("deliveredItems", 0L),
- entry("erroredItems", 1L),
- entry("deliveredContainers", 0L),
- entry("erroredContainers", 1L)
- );
+ }
+
+ @Test
+ void partialRead() throws Exception {
+ // Given
+ // we have a simple read
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // And
+ // and we simulate that some one responded
+ verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+ List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+ capturedDownstreamContainers.stream().findFirst().map(this::produceReadResponse);
+ // Then
+ // We create SUT with 1 seconds timeout
+ TimeUnit.SECONDS.sleep(2);
+ // our complete container should complete normally
+ verify(responseCompletableFuture).completeExceptionally(any());
+ // And we should have no memory leak
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("correlationIdGenerator", 5),
+ entry("deliveredItems", 1L),
+ entry("erroredItems", 4L),
+ entry("deliveredContainers", 0L),
+ entry("erroredContainers", 1L)
+ );
+ }
+
+ @Test
+ void partialReadOneErrored() throws Exception {
+ // Given
+ // we have a simple read
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // And
+ // and we simulate that some one responded
+ verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+ List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+ capturedDownstreamContainers.stream()
+ .findFirst()
+ .map(plcRequestContainer ->
+ plcRequestContainer
+ .getResponseFuture()
+ .completeExceptionally(new RuntimeException("ErrorOccurred"))
+ );
+ // Then
+ // We create SUT with 1 seconds timeout
+ TimeUnit.SECONDS.sleep(2);
+ // our complete container should complete normally
+ verify(responseCompletableFuture).completeExceptionally(any());
+ // And we should have no memory leak
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("correlationIdGenerator", 5),
+ entry("deliveredItems", 0L),
+ entry("erroredItems", 1L),
+ entry("deliveredContainers", 0L),
+ entry("erroredContainers", 1L)
+ );
+ }
+
+ @Test
+ void noRead() throws Exception {
+ // Given
+ // we have a simple read
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // And
+ // and we simulate that some one responded
+ verify(channelHandlerContext, times(5)).write(any(), any());
+ // Then
+ // We create SUT with 1 seconds timeout
+ TimeUnit.SECONDS.sleep(2);
+ // our complete container should complete normally
+ verify(responseCompletableFuture).completeExceptionally(any());
+ // And we should have no memory leak
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("correlationIdGenerator", 5),
+ entry("deliveredItems", 0L),
+ entry("erroredItems", 5L),
+ entry("deliveredContainers", 0L),
+ entry("erroredContainers", 1L)
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ private Void produceReadResponse(PlcRequestContainer plcRequestContainer) {
+ InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+ // TODO: we need a response for every item
+ String fieldName = request.getFieldNames().iterator().next();
+ CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
+ HashMap<String, Pair<PlcResponseCode, BaseDefaultFieldItem>> responseFields = new HashMap<>();
+ responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(BaseDefaultFieldItem.class)));
+ responseFuture.complete(new DefaultPlcReadResponse(request, responseFields));
+ return null;
+ }
}
- @Test
- void noRead() throws Exception {
- // Given
- // we have a simple read
- PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
- // When
- // we write this
- SUT.write(channelHandlerContext, msg, channelPromise);
- // And
- // and we simulate that some one responded
- verify(channelHandlerContext, times(5)).write(any(), any());
- // Then
- // We create SUT with 1 seconds timeout
- TimeUnit.SECONDS.sleep(2);
- // our complete container should complete normally
- verify(responseCompletableFuture).completeExceptionally(any());
- // And we should have no memory leak
- assertThat(SUT.getStatistics()).containsOnly(
- entry("queue", 0),
- entry("sentButUnacknowledgedSubContainer", 0),
- entry("correlationToParentContainer", 0),
- entry("containerCorrelationIdMap", 0),
- entry("responsesToBeDelivered", 0),
- entry("correlationIdGenerator", 5),
- entry("deliveredItems", 0L),
- entry("erroredItems", 5L),
- entry("deliveredContainers", 0L),
- entry("erroredContainers", 1L)
- );
+ @Nested
+ class Write {
+ // TODO: implement me
}
- @SuppressWarnings("unchecked")
- private Void produceReadResponse(PlcRequestContainer plcRequestContainer) {
- InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest();
- String fieldName = request.getFieldNames().iterator().next();
- CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
- HashMap<String, Pair<PlcResponseCode, BaseDefaultFieldItem>> responseFields = new HashMap<>();
- responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(BaseDefaultFieldItem.class)));
- responseFuture.complete(new DefaultPlcReadResponse(request, responseFields));
- return null;
+ @Nested
+ class Subscribe {
+ @Captor
+ ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
+
+ @Test
+ void simpleSubscribe() throws Exception {
+ // Given
+ // we have a simple read
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcSubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // And
+ // and we simulate that all get responded
+ verify(channelHandlerContext, times(3)).write(plcRequestContainerArgumentCaptor.capture(), any());
+ List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+ capturedDownstreamContainers.forEach(this::produceSubscriptionResponse);
+ // Then
+ // our complete container should complete normally
+ verify(responseCompletableFuture).complete(any());
+ // And we should have no memory leak
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("correlationIdGenerator", 3),
+ entry("erroredItems", 0L),
+ entry("deliveredItems", 3L),
+ entry("deliveredContainers", 1L),
+ entry("erroredContainers", 0L)
+ );
+ }
+
+ @Test
+ void simpleUnsubscribe() {
+ // TODO: implement me
+ }
+
+ @SuppressWarnings("unchecked")
+ private Void produceSubscriptionResponse(PlcRequestContainer plcRequestContainer) {
+ InternalPlcSubscriptionRequest request = (InternalPlcSubscriptionRequest) plcRequestContainer.getRequest();
+ // TODO: we need a response for every item
+ String fieldName = request.getFieldNames().iterator().next();
+ CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
+ HashMap<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> responseFields = new HashMap<>();
+ responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(PlcSubscriptionHandle.class)));
+ responseFuture.complete(new DefaultPlcSubscriptionResponse(request, responseFields));
+ return null;
+ }
}
}
@@ -437,8 +508,11 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
}
private static TestDefaultPlcSubscriptionRequest build(PlcSubscriber subscriber) {
- // TODO: implement me once available
- return new TestDefaultPlcSubscriptionRequest(subscriber, new LinkedHashMap<>());
+ LinkedHashMap<String, SubscriptionPlcField> fields = new LinkedHashMap<>();
+ fields.put("sub1", mock(SubscriptionPlcField.class));
+ fields.put("sub2", mock(SubscriptionPlcField.class));
+ fields.put("sub3", mock(SubscriptionPlcField.class));
+ return new TestDefaultPlcSubscriptionRequest(subscriber, fields);
}
}
@@ -449,8 +523,11 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
}
private static TestDefaultPlcUnsubscriptionRequest build(PlcSubscriber subscriber) {
- // TODO: implement me once available
- return new TestDefaultPlcUnsubscriptionRequest(subscriber, Collections.emptyList());
+ List<InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles = new LinkedList<>();
+ internalPlcSubscriptionHandles.add(mock(InternalPlcSubscriptionHandle.class));
+ internalPlcSubscriptionHandles.add(mock(InternalPlcSubscriptionHandle.class));
+ internalPlcSubscriptionHandles.add(mock(InternalPlcSubscriptionHandle.class));
+ return new TestDefaultPlcUnsubscriptionRequest(subscriber, internalPlcSubscriptionHandles);
}
}
}
\ No newline at end of file
diff --git a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java
index 8df399e..e2ff401 100644
--- a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java
+++ b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/connection/ModbusTcpPlcConnection.java
@@ -24,8 +24,6 @@ import com.digitalpetri.modbus.codec.ModbusTcpCodec;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
-import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
import org.apache.plc4x.java.base.connection.ChannelFactory;
import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
import org.apache.plc4x.java.base.protocol.SingleItemToSingleRequestProtocol;
@@ -34,7 +32,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public class ModbusTcpPlcConnection extends BaseModbusPlcConnection {
@@ -72,7 +69,7 @@ public class ModbusTcpPlcConnection extends BaseModbusPlcConnection {
protected void initChannel(Channel channel) {
channel.pipeline().addLast(new ModbusTcpCodec(new ModbusRequestEncoder(), new ModbusResponseDecoder()));
channel.pipeline().addLast(new Plc4XModbusProtocol());
- channel.pipeline().addLast(new SingleItemToSingleRequestProtocol(ModbusTcpPlcConnection.this, ModbusTcpPlcConnection.this, timer));
+ channel.pipeline().addLast(new SingleItemToSingleRequestProtocol(ModbusTcpPlcConnection.this, ModbusTcpPlcConnection.this, null, timer));
}
};
}