You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/12/18 22:58:49 UTC
[plc4x] branch next-gen-core updated: Implemented Response time
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/next-gen-core by this push:
new 852522e Implemented Response time
852522e is described below
commit 852522efd0c2c616f855a2152055521cbeb213dd
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Wed Dec 18 23:56:57 2019 +0100
Implemented Response time
---
.../java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java | 14 +++++++++++---
.../plc4x/java/spi/internal/DefaultSendRequestContext.java | 11 ++++++++---
.../plc4x/java/spi/internal/HandlerRegistration.java | 9 ++++++++-
.../plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java | 6 +++---
4 files changed, 30 insertions(+), 10 deletions(-)
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index 126d6e2..c03933a 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -30,7 +30,7 @@ import org.apache.plc4x.java.spi.messages.PlcRequestContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
+import java.time.Instant;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
@@ -69,15 +69,23 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, PlcRequestCon
protected void decode(ChannelHandlerContext channelHandlerContext, T t, List<Object> list) throws Exception {
logger.info("Decoding {}", t);
// Just iterate the list to find a suitable Handler
+
registrations:
- for (HandlerRegistration registration : this.registeredHandlers) {
+ for (Iterator<HandlerRegistration> iter = this.registeredHandlers.iterator(); iter.hasNext(); ) {
+ HandlerRegistration registration = iter.next();
+ // Check if the handler can still be used or should be removed
+ if (registration.getTimeout().isBefore(Instant.now())) {
+ logger.info("Removing {} as its timed out (was set till {})", registration, registration.getTimeout());
+ iter.remove();
+ continue;
+ }
logger.info("Checking handler {} for Object of type {}", registration, t.getClass().getSimpleName());
if (registration.getExpectClazz().isInstance(t)) {
logger.info("Handler {} has right expected type {}, checking condition", registration, registration.getExpectClazz().getSimpleName());
// Check all Commands / Functions
Deque<Either<Function<?, ?>, Predicate<?>>> commands = registration.getCommands();
Object instance = t;
- for (Iterator<Either<Function<?, ?>, Predicate<?>>> iterator = commands.iterator(); iterator.hasNext();) {
+ for (Iterator<Either<Function<?, ?>, Predicate<?>>> iterator = commands.iterator(); iterator.hasNext(); ) {
Either<Function<?, ?>, Predicate<?>> either = iterator.next();
if (either.isLeft()) {
Function unwrap = either.getLeft();
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
index 6be0ba1..45bc4fb 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
@@ -24,6 +24,7 @@ import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
import java.time.Duration;
+import java.time.Instant;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.TimeoutException;
@@ -51,14 +52,17 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
protected BiConsumer<?, ? extends Throwable> errorConsumer;
+ protected Duration timeout;
+
public DefaultSendRequestContext(Consumer<HandlerRegistration> finisher, T request, Plc4xNettyWrapper<T>.DefaultConversationContext<T> context) {
this.finisher = finisher;
this.request = request;
this.context = context;
}
- protected DefaultSendRequestContext(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Consumer<HandlerRegistration> finisher, Object request, Plc4xNettyWrapper<?>.DefaultConversationContext<?> context, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
+ protected DefaultSendRequestContext(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Duration timeout, Consumer<HandlerRegistration> finisher, Object request, Plc4xNettyWrapper<?>.DefaultConversationContext<?> context, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
this.commands = commands;
+ this.timeout = timeout;
this.finisher = finisher;
this.request = request;
this.context = context;
@@ -70,6 +74,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
@Override
public ConversationContext.SendRequestContext<T> expectResponse(Class<T> clazz, Duration timeout) {
+ this.timeout = timeout;
if (expectClazz != null) {
throw new ConversationContext.PlcWiringException("can't expect class of type " + clazz + " as we already expecting clazz of type " + expectClazz);
}
@@ -90,7 +95,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
throw new ConversationContext.PlcWiringException("can't handle multiple consumers");
}
this.packetConsumer = packetConsumer;
- finisher.accept(new HandlerRegistration(commands, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer));
+ finisher.accept(new HandlerRegistration(commands, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer, Instant.now().plus(timeout)));
context.sendToWire(request);
}
@@ -123,7 +128,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
};
}
commands.addLast(Either.left(unwrapper));
- return new DefaultSendRequestContext<R>(commands, finisher, request, context, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer);
+ return new DefaultSendRequestContext<R>(commands, timeout, finisher, request, context, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer);
}
@Override
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
index 2fa47a0..060a17c 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java
@@ -21,6 +21,7 @@ package org.apache.plc4x.java.spi.internal;
import io.vavr.control.Either;
+import java.time.Instant;
import java.util.Deque;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
@@ -43,13 +44,15 @@ public class HandlerRegistration {
private final Consumer<TimeoutException> onTimeoutConsumer;
private final BiConsumer<?, ? extends Throwable> errorConsumer;
+ private final Instant timeout;
- public HandlerRegistration(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer) {
+ public HandlerRegistration(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer, Instant timeout) {
this.commands = commands;
this.expectClazz = expectClazz;
this.packetConsumer = packetConsumer;
this.onTimeoutConsumer = onTimeoutConsumer;
this.errorConsumer = errorConsumer;
+ this.timeout = timeout;
}
public Deque<Either<Function<?, ?>, Predicate<?>>> getCommands() {
@@ -72,6 +75,10 @@ public class HandlerRegistration {
return errorConsumer;
}
+ public Instant getTimeout() {
+ return timeout;
+ }
+
@Override public String toString() {
return "HandlerRegistration#" + id;
}
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
index 70e91d2..14fabb5 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/Plc4xS7Protocol.java
@@ -89,12 +89,12 @@ public class Plc4xS7Protocol extends Plc4xProtocolBase<TPKTPacket> {
TPKTPacket packet = new TPKTPacket(createCOTPConnectionRequest(calledTsapId, callingTsapId, cotpTpduSize));
context.sendRequest(packet)
- .expectResponse(TPKTPacket.class, Duration.ofMillis(100))
+ .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
.check(p -> p.getPayload() instanceof COTPPacketConnectionResponse)
.unwrap(p -> (COTPPacketConnectionResponse) p.getPayload())
.handle(cotpPacketConnectionResponse -> {
context.sendRequest(createS7ConnectionRequest(cotpPacketConnectionResponse))
- .expectResponse(TPKTPacket.class, Duration.ofMillis(100))
+ .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
.unwrap(TPKTPacket::getPayload)
.only(COTPPacketData.class)
.unwrap(COTPPacket::getPayload)
@@ -116,7 +116,7 @@ public class Plc4xS7Protocol extends Plc4xProtocolBase<TPKTPacket> {
// Prepare a message to request the remote to identify itself.
TPKTPacket tpktPacket = createIdentifyRemoteMessage();
context.sendRequest(tpktPacket)
- .expectResponse(TPKTPacket.class, Duration.ofMillis(100))
+ .expectResponse(TPKTPacket.class, Duration.ofMillis(1000))
.check(p -> p.getPayload() instanceof COTPPacketData)
.unwrap(p -> ((COTPPacketData) p.getPayload()))
.check(p -> p.getPayload() instanceof S7MessageUserData)