You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/10/29 08:09:47 UTC
[incubator-plc4x] 01/02: [driver-bases] re-enabled unsubscription
support on SingleItemToSingleRequestProtocol
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 6fc11d9d8cb4794c05f998965869080ab07b7571
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Oct 29 07:54:08 2018 +0100
[driver-bases] re-enabled unsubscription support on SingleItemToSingleRequestProtocol
---
.../java/ads/connection/AdsTcpPlcConnection.java | 2 +-
.../messages/DefaultPlcUnsubscriptionResponse.java | 8 +++-
.../InternalPlcUnsubscriptionResponse.java | 2 +-
.../SingleItemToSingleRequestProtocol.java | 53 +++++++++++++++++++++-
.../SingleItemToSingleRequestProtocolTest.java | 41 ++++++++++++++++-
5 files changed, 99 insertions(+), 7 deletions(-)
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index cbd5a5d..5d10a01 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -255,7 +255,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
}
}
CompletableFuture<PlcUnsubscriptionResponse> future = new CompletableFuture<>();
- future.complete(new DefaultPlcUnsubscriptionResponse());
+ future.complete(new DefaultPlcUnsubscriptionResponse(internalPlcUnsubscriptionRequest));
return future;
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
index e3f855a..c26ac85 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
@@ -22,9 +22,15 @@ import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
public class DefaultPlcUnsubscriptionResponse implements InternalPlcUnsubscriptionResponse {
+ private final InternalPlcUnsubscriptionRequest request;
+
+ public DefaultPlcUnsubscriptionResponse(InternalPlcUnsubscriptionRequest request) {
+ this.request = request;
+ }
+
@Override
public PlcUnsubscriptionRequest getRequest() {
- return null;
+ return request;
}
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java
index a857cb6..8529d55 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java
@@ -20,6 +20,6 @@ package org.apache.plc4x.java.base.messages;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
-public interface InternalPlcUnsubscriptionResponse extends PlcUnsubscriptionResponse {
+public interface InternalPlcUnsubscriptionResponse extends PlcUnsubscriptionResponse, InternalPlcResponse {
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 22c210e..a6f1471 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -32,6 +32,7 @@ import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.base.messages.*;
import org.apache.plc4x.java.base.messages.items.BaseDefaultFieldItem;
+import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
import org.apache.plc4x.java.base.model.SubscriptionPlcField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ import java.util.stream.Collectors;
/**
* This layer can be used to split a {@link org.apache.plc4x.java.api.messages.PlcRequest} which addresses multiple {@link PlcField}s into multiple subsequent {@link org.apache.plc4x.java.api.messages.PlcRequest}s.
*/
+// TODO: add split config so we can override special requests that are allready splitted downstream
public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
@@ -217,6 +219,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
.forEach(stringPairMap -> stringPairMap.forEach(fields::put));
plcResponse = new DefaultPlcSubscriptionResponse(internalPlcSubscriptionRequest, fields);
+ } else if (originalPlcRequestContainer.getRequest() instanceof InternalPlcUnsubscriptionRequest) {
+ InternalPlcUnsubscriptionRequest internalPlcUnsubscriptionRequest = (InternalPlcUnsubscriptionRequest) originalPlcRequestContainer.getRequest();
+ plcResponse = new DefaultPlcUnsubscriptionResponse(internalPlcUnsubscriptionRequest);
} else {
errored(currentTdpu, new PlcProtocolException("Unknown type detected " + originalPlcRequestContainer.getRequest().getClass()), originalResponseFuture);
return;
@@ -327,7 +332,6 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
}
promiseCombiner.add((Future) subPromise);
});
- // TODO: add sub/unsub
} else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest) {
InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) internalPlcFieldRequest;
internalPlcSubscriptionRequest.getNamedSubscriptionFields().forEach(field -> {
@@ -353,10 +357,34 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
}
promiseCombiner.add((Future) subPromise);
});
- // TODO: add sub/unsub
} else {
throw new PlcProtocolException("Unmapped request type " + request.getClass());
}
+ } else if (request instanceof InternalPlcUnsubscriptionRequest) {
+ InternalPlcUnsubscriptionRequest internalPlcUnsubscriptionRequest = (InternalPlcUnsubscriptionRequest) request;
+ internalPlcUnsubscriptionRequest.getInternalPlcSubscriptionHandles().forEach(handle -> {
+ ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+ Integer tdpu = correlationIdGenerator.getAndIncrement();
+ CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>();
+ // Important: don't chain to above as we want the above to be completed not the result of when complete
+ correlatedCompletableFuture
+ .thenApply(InternalPlcResponse.class::cast)
+ .whenComplete((internalPlcResponse, throwable) -> {
+ if (throwable != null) {
+ errored(tdpu, throwable, in.getResponseFuture());
+ } else {
+ tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
+ }
+ });
+ PlcRequestContainer<CorrelatedPlcUnsubscriptionRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcUnsubscriptionRequest.of(subscriber, handle, tdpu), correlatedCompletableFuture);
+ correlationToParentContainer.put(tdpu, in);
+ queue.add(correlatedPlcRequestContainer, subPromise);
+ if (!tdpus.add(tdpu)) {
+ throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+ }
+ promiseCombiner.add((Future) subPromise);
+ });
} else {
ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
queue.add(msg, subPromise);
@@ -492,6 +520,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
}
}
+ protected static class CorrelatedPlcUnsubscriptionRequest extends DefaultPlcUnsubscriptionRequest implements CorrelatedPlcRequest {
+
+ protected final int tdpu;
+
+ protected CorrelatedPlcUnsubscriptionRequest(PlcSubscriber subscriber, LinkedList<InternalPlcSubscriptionHandle> subscriptionHandles, int tdpu) {
+ super(subscriber, subscriptionHandles);
+ this.tdpu = tdpu;
+ }
+
+ protected static CorrelatedPlcUnsubscriptionRequest of(PlcSubscriber subscriber, InternalPlcSubscriptionHandle subscriptionHandle, int tdpu) {
+ LinkedList<InternalPlcSubscriptionHandle> list = new LinkedList<>();
+ list.add(subscriptionHandle);
+ return new CorrelatedPlcUnsubscriptionRequest(subscriber, list, tdpu);
+ }
+
+ @Override
+ public int getTdpu() {
+ return tdpu;
+ }
+ }
+
// TODO: maybe export to jmx
public Map<String, Number> getStatistics() {
HashMap<String, Number> statistics = new HashMap<>();
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index fb85d2b..f3a3592 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -344,8 +344,34 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
}
@Test
- void simpleUnsubscribe() {
- // TODO: implement me
+ void simpleUnsubscribe() throws Exception {
+ // Given
+ // we have a simple read
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // And
+ // and we simulate that all get responded
+ verify(channelHandlerContext, times(3)).write(plcRequestContainerArgumentCaptor.capture(), any());
+ List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+ capturedDownstreamContainers.forEach(this::produceUnsubscriptionResponse);
+ // Then
+ // our complete container should complete normally
+ verify(responseCompletableFuture).complete(any());
+ // And we should have no memory leak
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("correlationIdGenerator", 3),
+ entry("erroredItems", 0L),
+ entry("deliveredItems", 3L),
+ entry("deliveredContainers", 1L),
+ entry("erroredContainers", 0L)
+ );
}
@SuppressWarnings("unchecked")
@@ -359,6 +385,17 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
responseFuture.complete(new DefaultPlcSubscriptionResponse(request, responseFields));
return null;
}
+
+ @SuppressWarnings("unchecked")
+ private Void produceUnsubscriptionResponse(PlcRequestContainer plcRequestContainer) {
+ InternalPlcUnsubscriptionRequest request = (InternalPlcUnsubscriptionRequest) plcRequestContainer.getRequest();
+ // TODO: we need a response for every item
+ InternalPlcSubscriptionHandle internalPlcSubscriptionHandle = request.getInternalPlcSubscriptionHandles().iterator().next();
+ // TODO: handles ignored for now.
+ CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
+ responseFuture.complete(new DefaultPlcUnsubscriptionResponse(request));
+ return null;
+ }
}
}