You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/01/05 22:59:54 UTC
[plc4x] branch next-gen-core updated: - Continued working on the
futures for handling the responses.
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/next-gen-core by this push:
new aa76159 - Continued working on the futures for handling the responses.
aa76159 is described below
commit aa7615985b01199be407fda094bf1dae14548e8b
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Sun Jan 5 23:59:42 2020 +0100
- Continued working on the futures for handling the responses.
---
.../s7/readwrite/protocol/S7ProtocolLogic.java | 176 ++++++++++++---------
1 file changed, 104 insertions(+), 72 deletions(-)
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index 43ed851..cd23871 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -202,7 +203,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
//
/////////////////////////////////////////////////////////////////
ParentFuture multiRequestFuture = new ParentFuture(
- (result) -> {
+ result -> {
try {
final S7MessageResponse s7MessageResponse =
processor.processResponse(s7MessageRequest, result);
@@ -214,7 +215,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
}
});
for (S7MessageRequest messageRequest : s7MessageRequests) {
- CompletableFuture<PlcReadResponse> childFuture = new CompletableFuture<>();
+ ChildFuture childFuture = new ChildFuture(messageRequest);
multiRequestFuture.addChildFuture(childFuture);
TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null, messageRequest, true,
@@ -222,27 +223,21 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
// Send the packet
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
- transaction.submit(() -> {
- context.sendRequest(tpktPacket)
- .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
- .onTimeout(childFuture::completeExceptionally)
- .onError((p, e) -> childFuture.completeExceptionally(e))
- .check(p -> p.getPayload() instanceof COTPPacketData)
- .unwrap(p -> ((COTPPacketData) p.getPayload()))
- .check(p -> p.getPayload() instanceof S7MessageResponse)
- .unwrap(p -> ((S7MessageResponse) p.getPayload()))
- .check(p -> p.getTpduReference() == tpduId)
- .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
- .handle(p -> {
- try {
- childFuture.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
- } catch (PlcProtocolException e) {
- logger.warn(String.format("Error sending 'read' message: '%s'", e.getMessage()), e);
- }
- // Finish the request-transaction.
- transaction.endRequest();
- });
- });
+ transaction.submit(() -> context.sendRequest(tpktPacket)
+ .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+ .onTimeout(childFuture::completeExceptionally)
+ .onError((p, e) -> childFuture.completeExceptionally(e))
+ .check(p -> p.getPayload() instanceof COTPPacketData)
+ .unwrap(p -> ((COTPPacketData) p.getPayload()))
+ .check(p -> p.getPayload() instanceof S7MessageResponse)
+ .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+ .check(p -> p.getTpduReference() == tpduId)
+ .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
+ .handle(p -> {
+ childFuture.complete(p);
+ // Finish the request-transaction.
+ transaction.endRequest();
+ }));
}
return multiRequestFuture;
}
@@ -258,27 +253,25 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null, s7MessageRequest, true, (short) tpduId));
// Start a new request-transaction (Is ended in the response-handler)
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
- transaction.submit(() -> {
- context.sendRequest(tpktPacket)
- .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
- .onTimeout(future::completeExceptionally)
- .onError((p, e) -> future.completeExceptionally(e))
- .check(p -> p.getPayload() instanceof COTPPacketData)
- .unwrap(p -> ((COTPPacketData) p.getPayload()))
- .check(p -> p.getPayload() instanceof S7MessageResponse)
- .unwrap(p -> ((S7MessageResponse) p.getPayload()))
- .check(p -> p.getTpduReference() == tpduId)
- .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
- .handle(p -> {
- try {
- future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
- } catch (PlcProtocolException e) {
- logger.warn(String.format("Error sending 'read' message: '%s'", e.getMessage()), e);
- }
- // Finish the request-transaction.
- transaction.endRequest();
- });
- });
+ transaction.submit(() -> context.sendRequest(tpktPacket)
+ .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+ .onTimeout(future::completeExceptionally)
+ .onError((p, e) -> future.completeExceptionally(e))
+ .check(p -> p.getPayload() instanceof COTPPacketData)
+ .unwrap(p -> ((COTPPacketData) p.getPayload()))
+ .check(p -> p.getPayload() instanceof S7MessageResponse)
+ .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+ .check(p -> p.getTpduReference() == tpduId)
+ .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
+ .handle(p -> {
+ try {
+ future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
+ } catch (PlcProtocolException e) {
+ logger.warn(String.format("Error sending 'read' message: '%s'", e.getMessage()), e);
+ }
+ // Finish the request-transaction.
+ transaction.endRequest();
+ }));
return future;
}
@@ -303,27 +296,25 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
// Start a new request-transaction (Is ended in the response-handler)
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
- transaction.submit(() -> {
- context.sendRequest(tpktPacket)
- .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
- .onTimeout(future::completeExceptionally)
- .onError((p, e) -> future.completeExceptionally(e))
- .check(p -> p.getPayload() instanceof COTPPacketData)
- .unwrap(p -> ((COTPPacketData) p.getPayload()))
- .check(p -> p.getPayload() instanceof S7MessageResponse)
- .unwrap(p -> ((S7MessageResponse) p.getPayload()))
- .check(p -> p.getTpduReference() == tpduId)
- .check(p -> p.getParameter() instanceof S7ParameterWriteVarResponse)
- .handle(p -> {
- try {
- future.complete(((PlcWriteResponse) decodeWriteResponse(p, ((InternalPlcWriteRequest) writeRequest))));
- } catch (PlcProtocolException e) {
- logger.warn(String.format("Error sending 'write' message: '%s'", e.getMessage()), e);
- }
- // Finish the request-transaction.
- transaction.endRequest();
- });
- });
+ transaction.submit(() -> context.sendRequest(tpktPacket)
+ .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+ .onTimeout(future::completeExceptionally)
+ .onError((p, e) -> future.completeExceptionally(e))
+ .check(p -> p.getPayload() instanceof COTPPacketData)
+ .unwrap(p -> ((COTPPacketData) p.getPayload()))
+ .check(p -> p.getPayload() instanceof S7MessageResponse)
+ .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+ .check(p -> p.getTpduReference() == tpduId)
+ .check(p -> p.getParameter() instanceof S7ParameterWriteVarResponse)
+ .handle(p -> {
+ try {
+ future.complete(((PlcWriteResponse) decodeWriteResponse(p, ((InternalPlcWriteRequest) writeRequest))));
+ } catch (PlcProtocolException e) {
+ logger.warn(String.format("Error sending 'write' message: '%s'", e.getMessage()), e);
+ }
+ // Finish the request-transaction.
+ transaction.endRequest();
+ }));
return future;
}
@@ -563,28 +554,69 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
return null;
}
- private class ParentFuture extends CompletableFuture<PlcReadResponse> {
+ private static class ChildFuture extends CompletableFuture<S7MessageResponse> {
+
+ private final S7MessageRequest request;
+
+ private Throwable exception;
+
+ public ChildFuture(S7MessageRequest request) {
+ this.request = request;
+ }
+
+ @Override
+ public boolean completeExceptionally(Throwable exception) {
+ this.exception = exception;
+ return super.completeExceptionally(exception);
+ }
+
+ public S7MessageRequest getRequest() {
+ return request;
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+
+ }
+
+ private static class ParentFuture extends CompletableFuture<PlcReadResponse> {
private final Consumer<Map<S7MessageRequest, Either<S7MessageResponse, Throwable>>> action;
- private final List<CompletableFuture<PlcReadResponse>> childFutures;
+ private final List<ChildFuture> childFutures;
public ParentFuture(Consumer<Map<S7MessageRequest, Either<S7MessageResponse, Throwable>>> action) {
this.action = action;
this.childFutures = new LinkedList<>();
}
- public void addChildFuture(CompletableFuture<PlcReadResponse> childFuture) {
- childFuture.whenComplete((value, throwable) -> {
+ public void addChildFuture(ChildFuture future) {
+ future.whenComplete((value, throwable) -> {
// If all futures are done (None are not done), do the completion action.
if(childFutures.stream().allMatch(CompletableFuture::isDone)) {
- // TODO: Create the result list of S7MessageResponses
- Map<S7MessageRequest, Either<S7MessageResponse, Throwable>> result = null;
+ Map<S7MessageRequest, Either<S7MessageResponse, Throwable>> result = new HashMap<>();
+ childFutures.forEach(childFuture -> {
+ Either<S7MessageResponse, Throwable> subResult = null;
+ if (childFuture.isCancelled()) {
+ subResult = Either.right(new Exception("Cancelled"));
+ } else if (childFuture.isCompletedExceptionally()) {
+ // Get the original exception and pass it along.
+ subResult = Either.right(childFuture.getException());
+ } else {
+ try {
+ subResult = Either.left(childFuture.get());
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ result.put(childFuture.getRequest(), subResult);
+ });
// Call the result handler and pass in the list of results.
action.accept(result);
}
});
- childFutures.add(childFuture);
+ childFutures.add(future);
}
}