You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/12/18 22:58:49 UTC

[plc4x] branch next-gen-core updated: Implemented Response time

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/next-gen-core by this push:
     new 852522e  Implemented Response time
852522e is described below

commit 852522efd0c2c616f855a2152055521cbeb213dd
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Wed Dec 18 23:56:57 2019 +0100

    Implemented Response time
---
 .../java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java  | 14 +++++++++++---
 .../plc4x/java/spi/internal/DefaultSendRequestContext.java | 11 ++++++++---
 .../plc4x/java/spi/internal/HandlerRegistration.java       |  9 ++++++++-
 .../plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java  |  6 +++---
 4 files changed, 30 insertions(+), 10 deletions(-)

diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index 126d6e2..c03933a 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -30,7 +30,7 @@ import org.apache.plc4x.java.spi.messages.PlcRequestContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import java.time.Instant;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
@@ -69,15 +69,23 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, PlcRequestCon
     protected void decode(ChannelHandlerContext channelHandlerContext, T t, List<Object> list) throws Exception {
         logger.info("Decoding {}", t);
         // Just iterate the list to find a suitable  Handler
+
         registrations:
-        for (HandlerRegistration registration : this.registeredHandlers) {
+        for (Iterator<HandlerRegistration> iter = this.registeredHandlers.iterator(); iter.hasNext(); ) {
+            HandlerRegistration registration = iter.next();
+            // Check if the handler can still be used or should be removed
+            if (registration.getTimeout().isBefore(Instant.now())) {
+                logger.info("Removing {} as its timed out (was set till {})", registration, registration.getTimeout());
+                iter.remove();
+                continue;
+            }
             logger.info("Checking handler {} for Object of type {}", registration, t.getClass().getSimpleName());
             if (registration.getExpectClazz().isInstance(t)) {
                 logger.info("Handler {} has right expected type {}, checking condition", registration, registration.getExpectClazz().getSimpleName());
                 // Check all Commands / Functions
                 Deque<Either<Function<?, ?>, Predicate<?>>> commands = registration.getCommands();
                 Object instance = t;
-                for (Iterator<Either<Function<?, ?>, Predicate<?>>> iterator = commands.iterator(); iterator.hasNext();) {
+                for (Iterator<Either<Function<?, ?>, Predicate<?>>> iterator = commands.iterator(); iterator.hasNext(); ) {
                     Either<Function<?, ?>, Predicate<?>> either = iterator.next();
                     if (either.isLeft()) {
                         Function unwrap = either.getLeft();
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
index 6be0ba1..45bc4fb 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
@@ -24,6 +24,7 @@ import org.apache.plc4x.java.spi.ConversationContext;
 import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.concurrent.TimeoutException;
@@ -51,14 +52,17 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
 
     protected BiConsumer<?, ? extends Throwable> errorConsumer;
 
+    protected Duration timeout;
+
     public DefaultSendRequestContext(Consumer<HandlerRegistration> finisher, T request, Plc4xNettyWrapper<T>.DefaultConversationContext<T> context) {
         this.finisher = finisher;
         this.request = request;
         this.context = context;
     }
 
-    protected DefaultSendRequestContext(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Consumer<HandlerRegistration> finisher, Object request, Plc4xNettyWrapper<?>.DefaultConversationContext<?> context, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
+    protected DefaultSendRequestContext(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Duration timeout, Consumer<HandlerRegistration> finisher, Object request, Plc4xNettyWrapper<?>.DefaultConversationContext<?> context, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
         this.commands = commands;
+        this.timeout = timeout;
         this.finisher = finisher;
         this.request = request;
         this.context = context;
@@ -70,6 +74,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
 
     @Override
     public ConversationContext.SendRequestContext<T> expectResponse(Class<T> clazz, Duration timeout) {
+        this.timeout = timeout;
         if (expectClazz != null) {
             throw new ConversationContext.PlcWiringException("can't expect class of type " + clazz + " as we already expecting clazz of type " + expectClazz);
         }
@@ -90,7 +95,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
             throw new ConversationContext.PlcWiringException("can't handle multiple consumers");
         }
         this.packetConsumer = packetConsumer;
-        finisher.accept(new HandlerRegistration(commands, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer));
+        finisher.accept(new HandlerRegistration(commands, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer, Instant.now().plus(timeout)));
         context.sendToWire(request);
     }
 
@@ -123,7 +128,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
             };
         }
         commands.addLast(Either.left(unwrapper));
-        return new DefaultSendRequestContext<R>(commands, finisher, request, context, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer);
+        return new DefaultSendRequestContext<R>(commands, timeout, finisher, request, context, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer);
     }
 
     @Override
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
index 2fa47a0..060a17c 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
@@ -21,6 +21,7 @@ package org.apache.plc4x.java.spi.internal;
 
 import io.vavr.control.Either;
 
+import java.time.Instant;
 import java.util.Deque;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
@@ -43,13 +44,15 @@ public class HandlerRegistration {
     private final Consumer<TimeoutException> onTimeoutConsumer;
 
     private final BiConsumer<?, ? extends Throwable> errorConsumer;
+    private final Instant timeout;
 
-    public HandlerRegistration(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
+    public HandlerRegistration(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer, Instant timeout) {
         this.commands = commands;
         this.expectClazz = expectClazz;
         this.packetConsumer = packetConsumer;
         this.onTimeoutConsumer = onTimeoutConsumer;
         this.errorConsumer = errorConsumer;
+        this.timeout = timeout;
     }
 
     public Deque<Either<Function<?, ?>, Predicate<?>>> getCommands() {
@@ -72,6 +75,10 @@ public class HandlerRegistration {
         return errorConsumer;
     }
 
+    public Instant getTimeout() {
+        return timeout;
+    }
+
     @Override public String toString() {
         return "HandlerRegistration#" + id;
     }
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
index 70e91d2..14fabb5 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
@@ -89,12 +89,12 @@ public class Plc4xS7Protocol extends Plc4xProtocolBase<TPKTPacket> {
         TPKTPacket packet = new TPKTPacket(createCOTPConnectionRequest(calledTsapId, callingTsapId, cotpTpduSize));
 
         context.sendRequest(packet)
-            .expectResponse(TPKTPacket.class, Duration.ofMillis(100))
+            .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
             .check(p -> p.getPayload() instanceof COTPPacketConnectionResponse)
             .unwrap(p -> (COTPPacketConnectionResponse) p.getPayload())
             .handle(cotpPacketConnectionResponse -> {
                 context.sendRequest(createS7ConnectionRequest(cotpPacketConnectionResponse))
-                    .expectResponse(TPKTPacket.class, Duration.ofMillis(100))
+                    .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
                     .unwrap(TPKTPacket::getPayload)
                     .only(COTPPacketData.class)
                     .unwrap(COTPPacket::getPayload)
@@ -116,7 +116,7 @@ public class Plc4xS7Protocol extends Plc4xProtocolBase<TPKTPacket> {
                         // Prepare a message to request the remote to identify itself.
                         TPKTPacket tpktPacket = createIdentifyRemoteMessage();
                         context.sendRequest(tpktPacket)
-                            .expectResponse(TPKTPacket.class, Duration.ofMillis(100))
+                            .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
                             .check(p -> p.getPayload() instanceof COTPPacketData)
                             .unwrap(p -> ((COTPPacketData) p.getPayload()))
                             .check(p -> p.getPayload() instanceof S7MessageUserData)