You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/10/29 08:09:47 UTC

[incubator-plc4x] 01/02: [driver-bases] re-enabled unsubscription support on SingleItemToSingleRequestProtocol

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 6fc11d9d8cb4794c05f998965869080ab07b7571
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Oct 29 07:54:08 2018 +0100

    [driver-bases] re-enabled unsubscription support on SingleItemToSingleRequestProtocol
---
 .../java/ads/connection/AdsTcpPlcConnection.java   |  2 +-
 .../messages/DefaultPlcUnsubscriptionResponse.java |  8 +++-
 .../InternalPlcUnsubscriptionResponse.java         |  2 +-
 .../SingleItemToSingleRequestProtocol.java         | 53 +++++++++++++++++++++-
 .../SingleItemToSingleRequestProtocolTest.java     | 41 ++++++++++++++++-
 5 files changed, 99 insertions(+), 7 deletions(-)

diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index cbd5a5d..5d10a01 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -255,7 +255,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
             }
         }
         CompletableFuture<PlcUnsubscriptionResponse> future = new CompletableFuture<>();
-        future.complete(new DefaultPlcUnsubscriptionResponse());
+        future.complete(new DefaultPlcUnsubscriptionResponse(internalPlcUnsubscriptionRequest));
         return future;
     }
 
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
index e3f855a..c26ac85 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
@@ -22,9 +22,15 @@ import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
 
 public class DefaultPlcUnsubscriptionResponse implements InternalPlcUnsubscriptionResponse {
 
+    private final InternalPlcUnsubscriptionRequest request;
+
+    public DefaultPlcUnsubscriptionResponse(InternalPlcUnsubscriptionRequest request) {
+        this.request = request;
+    }
+
     @Override
     public PlcUnsubscriptionRequest getRequest() {
-        return null;
+        return request;
     }
 
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java
index a857cb6..8529d55 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionResponse.java
@@ -20,6 +20,6 @@ package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
 
-public interface InternalPlcUnsubscriptionResponse extends PlcUnsubscriptionResponse {
+public interface InternalPlcUnsubscriptionResponse extends PlcUnsubscriptionResponse, InternalPlcResponse {
 
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 22c210e..a6f1471 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -32,6 +32,7 @@ import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.messages.*;
 import org.apache.plc4x.java.base.messages.items.BaseDefaultFieldItem;
+import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
 import org.apache.plc4x.java.base.model.SubscriptionPlcField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ import java.util.stream.Collectors;
 /**
  * This layer can be used to split a {@link org.apache.plc4x.java.api.messages.PlcRequest} which addresses multiple {@link PlcField}s into multiple subsequent {@link org.apache.plc4x.java.api.messages.PlcRequest}s.
  */
+// TODO: add split config so we can override special requests that are allready splitted downstream
 public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
@@ -217,6 +219,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                     .forEach(stringPairMap -> stringPairMap.forEach(fields::put));
 
                 plcResponse = new DefaultPlcSubscriptionResponse(internalPlcSubscriptionRequest, fields);
+            } else if (originalPlcRequestContainer.getRequest() instanceof InternalPlcUnsubscriptionRequest) {
+                InternalPlcUnsubscriptionRequest internalPlcUnsubscriptionRequest = (InternalPlcUnsubscriptionRequest) originalPlcRequestContainer.getRequest();
+                plcResponse = new DefaultPlcUnsubscriptionResponse(internalPlcUnsubscriptionRequest);
             } else {
                 errored(currentTdpu, new PlcProtocolException("Unknown type detected " + originalPlcRequestContainer.getRequest().getClass()), originalResponseFuture);
                 return;
@@ -327,7 +332,6 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         }
                         promiseCombiner.add((Future) subPromise);
                     });
-                    // TODO: add sub/unsub
                 } else if (internalPlcFieldRequest instanceof InternalPlcSubscriptionRequest) {
                     InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = (InternalPlcSubscriptionRequest) internalPlcFieldRequest;
                     internalPlcSubscriptionRequest.getNamedSubscriptionFields().forEach(field -> {
@@ -353,10 +357,34 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                         }
                         promiseCombiner.add((Future) subPromise);
                     });
-                    // TODO: add sub/unsub
                 } else {
                     throw new PlcProtocolException("Unmapped request type " + request.getClass());
                 }
+            } else if (request instanceof InternalPlcUnsubscriptionRequest) {
+                InternalPlcUnsubscriptionRequest internalPlcUnsubscriptionRequest = (InternalPlcUnsubscriptionRequest) request;
+                internalPlcUnsubscriptionRequest.getInternalPlcSubscriptionHandles().forEach(handle -> {
+                    ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+                    Integer tdpu = correlationIdGenerator.getAndIncrement();
+                    CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>();
+                    // Important: don't chain to above as we want the above to be completed not the result of when complete
+                    correlatedCompletableFuture
+                        .thenApply(InternalPlcResponse.class::cast)
+                        .whenComplete((internalPlcResponse, throwable) -> {
+                            if (throwable != null) {
+                                errored(tdpu, throwable, in.getResponseFuture());
+                            } else {
+                                tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
+                            }
+                        });
+                    PlcRequestContainer<CorrelatedPlcUnsubscriptionRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcUnsubscriptionRequest.of(subscriber, handle, tdpu), correlatedCompletableFuture);
+                    correlationToParentContainer.put(tdpu, in);
+                    queue.add(correlatedPlcRequestContainer, subPromise);
+                    if (!tdpus.add(tdpu)) {
+                        throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                    }
+                    promiseCombiner.add((Future) subPromise);
+                });
             } else {
                 ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
                 queue.add(msg, subPromise);
@@ -492,6 +520,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
         }
     }
 
+    protected static class CorrelatedPlcUnsubscriptionRequest extends DefaultPlcUnsubscriptionRequest implements CorrelatedPlcRequest {
+
+        protected final int tdpu;
+
+        protected CorrelatedPlcUnsubscriptionRequest(PlcSubscriber subscriber, LinkedList<InternalPlcSubscriptionHandle> subscriptionHandles, int tdpu) {
+            super(subscriber, subscriptionHandles);
+            this.tdpu = tdpu;
+        }
+
+        protected static CorrelatedPlcUnsubscriptionRequest of(PlcSubscriber subscriber, InternalPlcSubscriptionHandle subscriptionHandle, int tdpu) {
+            LinkedList<InternalPlcSubscriptionHandle> list = new LinkedList<>();
+            list.add(subscriptionHandle);
+            return new CorrelatedPlcUnsubscriptionRequest(subscriber, list, tdpu);
+        }
+
+        @Override
+        public int getTdpu() {
+            return tdpu;
+        }
+    }
+
     // TODO: maybe export to jmx
     public Map<String, Number> getStatistics() {
         HashMap<String, Number> statistics = new HashMap<>();
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index fb85d2b..f3a3592 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -344,8 +344,34 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
             }
 
             @Test
-            void simpleUnsubscribe() {
-                // TODO: implement me
+            void simpleUnsubscribe() throws Exception {
+                // Given
+                // we have a simple read
+                PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcUnsubscriptionRequest.build(mockSubscriber), responseCompletableFuture);
+                // When
+                // we write this
+                SUT.write(channelHandlerContext, msg, channelPromise);
+                // And
+                // and we simulate that all get responded
+                verify(channelHandlerContext, times(3)).write(plcRequestContainerArgumentCaptor.capture(), any());
+                List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+                capturedDownstreamContainers.forEach(this::produceUnsubscriptionResponse);
+                // Then
+                // our complete container should complete normally
+                verify(responseCompletableFuture).complete(any());
+                // And we should have no memory leak
+                assertThat(SUT.getStatistics()).containsOnly(
+                    entry("queue", 0),
+                    entry("sentButUnacknowledgedSubContainer", 0),
+                    entry("correlationToParentContainer", 0),
+                    entry("containerCorrelationIdMap", 0),
+                    entry("responsesToBeDelivered", 0),
+                    entry("correlationIdGenerator", 3),
+                    entry("erroredItems", 0L),
+                    entry("deliveredItems", 3L),
+                    entry("deliveredContainers", 1L),
+                    entry("erroredContainers", 0L)
+                );
             }
 
             @SuppressWarnings("unchecked")
@@ -359,6 +385,17 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
                 responseFuture.complete(new DefaultPlcSubscriptionResponse(request, responseFields));
                 return null;
             }
+
+            @SuppressWarnings("unchecked")
+            private Void produceUnsubscriptionResponse(PlcRequestContainer plcRequestContainer) {
+                InternalPlcUnsubscriptionRequest request = (InternalPlcUnsubscriptionRequest) plcRequestContainer.getRequest();
+                // TODO: we need a response for every item
+                InternalPlcSubscriptionHandle internalPlcSubscriptionHandle = request.getInternalPlcSubscriptionHandles().iterator().next();
+                // TODO: handles ignored for now.
+                CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
+                responseFuture.complete(new DefaultPlcUnsubscriptionResponse(request));
+                return null;
+            }
         }
     }