You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/01/05 22:59:54 UTC

[plc4x] branch next-gen-core updated: - Continued working on the futures for handling the responses.

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/next-gen-core by this push:
     new aa76159  - Continued working on the futures for handling the responses.
aa76159 is described below

commit aa7615985b01199be407fda094bf1dae14548e8b
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Sun Jan 5 23:59:42 2020 +0100

    - Continued working on the futures for handling the responses.
---
 .../s7/readwrite/protocol/S7ProtocolLogic.java     | 176 ++++++++++++---------
 1 file changed, 104 insertions(+), 72 deletions(-)

diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index 43ed851..cd23871 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
@@ -202,7 +203,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
                     //
                     /////////////////////////////////////////////////////////////////
                     ParentFuture multiRequestFuture = new ParentFuture(
-                        (result) -> {
+                        result -> {
                             try {
                                 final S7MessageResponse s7MessageResponse =
                                     processor.processResponse(s7MessageRequest, result);
@@ -214,7 +215,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
                             }
                         });
                     for (S7MessageRequest messageRequest : s7MessageRequests) {
-                        CompletableFuture<PlcReadResponse> childFuture = new CompletableFuture<>();
+                        ChildFuture childFuture = new ChildFuture(messageRequest);
                         multiRequestFuture.addChildFuture(childFuture);
 
                         TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null, messageRequest, true,
@@ -222,27 +223,21 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
 
                         // Send the packet
                         RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
-                        transaction.submit(() -> {
-                            context.sendRequest(tpktPacket)
-                                .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
-                                .onTimeout(childFuture::completeExceptionally)
-                                .onError((p, e) -> childFuture.completeExceptionally(e))
-                                .check(p -> p.getPayload() instanceof COTPPacketData)
-                                .unwrap(p -> ((COTPPacketData) p.getPayload()))
-                                .check(p -> p.getPayload() instanceof S7MessageResponse)
-                                .unwrap(p -> ((S7MessageResponse) p.getPayload()))
-                                .check(p -> p.getTpduReference() == tpduId)
-                                .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
-                                .handle(p -> {
-                                    try {
-                                        childFuture.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
-                                    } catch (PlcProtocolException e) {
-                                        logger.warn(String.format("Error sending 'read' message: '%s'", e.getMessage()), e);
-                                    }
-                                    // Finish the request-transaction.
-                                    transaction.endRequest();
-                                });
-                        });
+                        transaction.submit(() -> context.sendRequest(tpktPacket)
+                            .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+                            .onTimeout(childFuture::completeExceptionally)
+                            .onError((p, e) -> childFuture.completeExceptionally(e))
+                            .check(p -> p.getPayload() instanceof COTPPacketData)
+                            .unwrap(p -> ((COTPPacketData) p.getPayload()))
+                            .check(p -> p.getPayload() instanceof S7MessageResponse)
+                            .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+                            .check(p -> p.getTpduReference() == tpduId)
+                            .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
+                            .handle(p -> {
+                                childFuture.complete(p);
+                                // Finish the request-transaction.
+                                transaction.endRequest();
+                            }));
                     }
                     return multiRequestFuture;
                 }
@@ -258,27 +253,25 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
         TPKTPacket tpktPacket = new TPKTPacket(new COTPPacketData(null, s7MessageRequest, true, (short) tpduId));
         // Start a new request-transaction (Is ended in the response-handler)
         RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
-        transaction.submit(() -> {
-            context.sendRequest(tpktPacket)
-                .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
-                .onTimeout(future::completeExceptionally)
-                .onError((p, e) -> future.completeExceptionally(e))
-                .check(p -> p.getPayload() instanceof COTPPacketData)
-                .unwrap(p -> ((COTPPacketData) p.getPayload()))
-                .check(p -> p.getPayload() instanceof S7MessageResponse)
-                .unwrap(p -> ((S7MessageResponse) p.getPayload()))
-                .check(p -> p.getTpduReference() == tpduId)
-                .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
-                .handle(p -> {
-                    try {
-                        future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
-                    } catch (PlcProtocolException e) {
-                        logger.warn(String.format("Error sending 'read' message: '%s'", e.getMessage()), e);
-                    }
-                    // Finish the request-transaction.
-                    transaction.endRequest();
-                });
-        });
+        transaction.submit(() -> context.sendRequest(tpktPacket)
+            .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+            .onTimeout(future::completeExceptionally)
+            .onError((p, e) -> future.completeExceptionally(e))
+            .check(p -> p.getPayload() instanceof COTPPacketData)
+            .unwrap(p -> ((COTPPacketData) p.getPayload()))
+            .check(p -> p.getPayload() instanceof S7MessageResponse)
+            .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+            .check(p -> p.getTpduReference() == tpduId)
+            .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
+            .handle(p -> {
+                try {
+                    future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
+                } catch (PlcProtocolException e) {
+                    logger.warn(String.format("Error sending 'read' message: '%s'", e.getMessage()), e);
+                }
+                // Finish the request-transaction.
+                transaction.endRequest();
+            }));
         return future;
     }
 
@@ -303,27 +296,25 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
 
         // Start a new request-transaction (Is ended in the response-handler)
         RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
-        transaction.submit(() -> {
-            context.sendRequest(tpktPacket)
-                .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
-                .onTimeout(future::completeExceptionally)
-                .onError((p, e) -> future.completeExceptionally(e))
-                .check(p -> p.getPayload() instanceof COTPPacketData)
-                .unwrap(p -> ((COTPPacketData) p.getPayload()))
-                .check(p -> p.getPayload() instanceof S7MessageResponse)
-                .unwrap(p -> ((S7MessageResponse) p.getPayload()))
-                .check(p -> p.getTpduReference() == tpduId)
-                .check(p -> p.getParameter() instanceof S7ParameterWriteVarResponse)
-                .handle(p -> {
-                    try {
-                        future.complete(((PlcWriteResponse) decodeWriteResponse(p, ((InternalPlcWriteRequest) writeRequest))));
-                    } catch (PlcProtocolException e) {
-                        logger.warn(String.format("Error sending 'write' message: '%s'", e.getMessage()), e);
-                    }
-                    // Finish the request-transaction.
-                    transaction.endRequest();
-                });
-        });
+        transaction.submit(() -> context.sendRequest(tpktPacket)
+            .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+            .onTimeout(future::completeExceptionally)
+            .onError((p, e) -> future.completeExceptionally(e))
+            .check(p -> p.getPayload() instanceof COTPPacketData)
+            .unwrap(p -> ((COTPPacketData) p.getPayload()))
+            .check(p -> p.getPayload() instanceof S7MessageResponse)
+            .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+            .check(p -> p.getTpduReference() == tpduId)
+            .check(p -> p.getParameter() instanceof S7ParameterWriteVarResponse)
+            .handle(p -> {
+                try {
+                    future.complete(((PlcWriteResponse) decodeWriteResponse(p, ((InternalPlcWriteRequest) writeRequest))));
+                } catch (PlcProtocolException e) {
+                    logger.warn(String.format("Error sending 'write' message: '%s'", e.getMessage()), e);
+                }
+                // Finish the request-transaction.
+                transaction.endRequest();
+            }));
         return future;
     }
 
@@ -563,28 +554,69 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
         return null;
     }
 
-    private class ParentFuture extends CompletableFuture<PlcReadResponse> {
+    private static class ChildFuture extends CompletableFuture<S7MessageResponse> {
+
+        private final S7MessageRequest request;
+
+        private Throwable exception;
+
+        public ChildFuture(S7MessageRequest request) {
+            this.request = request;
+        }
+
+        @Override
+        public boolean completeExceptionally(Throwable exception) {
+            this.exception = exception;
+            return super.completeExceptionally(exception);
+        }
+
+        public S7MessageRequest getRequest() {
+            return request;
+        }
+
+        public Throwable getException() {
+            return exception;
+        }
+
+    }
+
+    private static class ParentFuture extends CompletableFuture<PlcReadResponse> {
 
         private final Consumer<Map<S7MessageRequest, Either<S7MessageResponse, Throwable>>> action;
-        private final List<CompletableFuture<PlcReadResponse>> childFutures;
+        private final List<ChildFuture> childFutures;
 
         public ParentFuture(Consumer<Map<S7MessageRequest, Either<S7MessageResponse, Throwable>>> action) {
             this.action = action;
             this.childFutures = new LinkedList<>();
         }
 
-        public void addChildFuture(CompletableFuture<PlcReadResponse> childFuture) {
-            childFuture.whenComplete((value, throwable) -> {
+        public void addChildFuture(ChildFuture future) {
+            future.whenComplete((value, throwable) -> {
                 // If all futures are done (None are not done), do the completion action.
                 if(childFutures.stream().allMatch(CompletableFuture::isDone)) {
-                    // TODO: Create the result list of S7MessageResponses
-                    Map<S7MessageRequest, Either<S7MessageResponse, Throwable>> result = null;
+                    Map<S7MessageRequest, Either<S7MessageResponse, Throwable>> result = new HashMap<>();
+                    childFutures.forEach(childFuture -> {
+                        Either<S7MessageResponse, Throwable> subResult = null;
+                        if (childFuture.isCancelled()) {
+                            subResult = Either.right(new Exception("Cancelled"));
+                        } else if (childFuture.isCompletedExceptionally()) {
+                            // Get the original exception and pass it along.
+                            subResult = Either.right(childFuture.getException());
+                        } else {
+                            try {
+                                subResult = Either.left(childFuture.get());
+                            } catch (InterruptedException | ExecutionException e) {
+                                e.printStackTrace();
+                            }
+                        }
+                        result.put(childFuture.getRequest(), subResult);
+                    });
 
                     // Call the result handler and pass in the list of results.
                     action.accept(result);
                 }
             });
-            childFutures.add(childFuture);
+            childFutures.add(future);
         }
 
     }