You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/12/28 22:04:44 UTC

[plc4x] branch next-gen-core updated: Implemented Generic RequestTransactionManager and implemented it in S7 Protocol Logic.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/next-gen-core by this push:
     new 8a3576b  Implemented Generic RequestTransactionManager and implemented it in S7 Protocol Logic.
8a3576b is described below

commit 8a3576b0059993d99bd1af288c4a8f0445118cc7
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Dec 28 23:04:03 2019 +0100

    Implemented Generic RequestTransactionManager and implemented it in S7 Protocol Logic.
---
 .../spi/optimizer/RequestTransactionManager.java   | 180 +++++++++++++++++++++
 .../optimizer/RequestTransactionManagerTest.java   | 170 +++++++++++++++++++
 sandbox/test-java-s7-driver/pom.xml                |   5 +
 .../s7/readwrite/protocol/S7ProtocolLogic.java     |  86 ++++++----
 4 files changed, 407 insertions(+), 34 deletions(-)

diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManager.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManager.java
new file mode 100644
index 0000000..d90ad37
--- /dev/null
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManager.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.spi.optimizer;
+
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+/**
+ * This is a limited Queue of Requests, a Protocol can use.
+ * <p>
+ * The Following Steps are necessary
+ * <ul>
+ *     <li>Register Slot</li>
+ *     <li>Pass Runnable</li>
+ *     <li>On Request or Response unregister Slot</li>
+ * </ul>
+ */
+public class RequestTransactionManager {
+
+    /** Executor that performs all operations */
+    static final ExecutorService executor = Executors.newFixedThreadPool(4);
+    private final Set<RequestTransaction> runningRequests;
+    /** How many Transactions are allowed to run at the same time? */
+    private int numberOfConcurrentRequests;
+    /** Assigns each request a Unique Transaction Id, especially important for failure handling */
+    private AtomicInteger transactionId = new AtomicInteger(0);
+    /** Important, this is a FIFO Queue for Fairness! */
+    private Queue<RequestTransaction> workLog = new ConcurrentLinkedQueue<>();
+
+    public RequestTransactionManager(int numberOfConcurrentRequests) {
+        this.numberOfConcurrentRequests = numberOfConcurrentRequests;
+        // Immutable Map
+        runningRequests = ConcurrentHashMap.newKeySet();
+    }
+
+    public RequestTransactionManager() {
+        this(1);
+    }
+
+    public int getNumberOfConcurrentRequests() {
+        return numberOfConcurrentRequests;
+    }
+
+    public void setNumberOfConcurrentRequests(int numberOfConcurrentRequests) {
+        this.numberOfConcurrentRequests = numberOfConcurrentRequests;
+    }
+
+    public void submit(Consumer<RequestTransaction> context) {
+        RequestTransaction transaction = startRequest();
+        context.accept(transaction);
+        // this.submit(transaction);
+    }
+
+    void submit(RequestTransaction handle) {
+        assert handle.operation != null;
+        // Add this Request with this handle i the Worklog
+        // Put Transaction into Worklog
+        this.workLog.add(handle);
+        // Try to Process the Worklog
+        processWorklog();
+    }
+
+    private void processWorklog() {
+        while (runningRequests.size() < getNumberOfConcurrentRequests() && !workLog.isEmpty()) {
+            RequestTransaction next = workLog.remove();
+            this.runningRequests.add(next);
+            Future<?> completionFuture = executor.submit(next.operation);
+            next.setCompletionFuture(completionFuture);
+        }
+    }
+
+
+    public RequestTransaction startRequest() {
+        return new RequestTransaction(this, this.transactionId.getAndIncrement());
+    }
+
+    public int getNumberOfActiveRequests() {
+        return this.runningRequests.size();
+    }
+
+    private void failRequest(RequestTransaction transaction) {
+        // Try to fail it!
+        transaction.getCompletionFuture().cancel(true);
+        // End it
+        endRequest(transaction);
+    }
+
+    private void endRequest(RequestTransaction transaction) {
+        if (!this.runningRequests.contains(transaction)) {
+            throw new IllegalArgumentException("Unknown Transaction or Transaction already finished!");
+        }
+        this.runningRequests.remove(transaction);
+        // Process the worklog, a slot should be free now
+        processWorklog();
+    }
+
+    public static class RequestTransaction {
+
+        private final RequestTransactionManager parent;
+        private final int transactionId;
+
+        /** The iniital operation to perform to kick off the request */
+        private Runnable operation;
+        private Future<?> completionFuture;
+
+        public RequestTransaction(RequestTransactionManager parent, int transactionId) {
+            this.parent = parent;
+            this.transactionId = transactionId;
+        }
+
+        public void start() {
+
+        }
+
+        public void failRequest(Throwable t) {
+            this.parent.failRequest(this);
+        }
+
+        public void endRequest() {
+            // Remove it from Running Requests
+            this.parent.endRequest(this);
+        }
+
+        public void setOperation(Runnable operation) {
+            this.operation = operation;
+        }
+
+        public Future<?> getCompletionFuture() {
+            return completionFuture;
+        }
+
+        public void setCompletionFuture(Future<?> completionFuture) {
+            this.completionFuture = completionFuture;
+        }
+
+        public void submit(Runnable operation) {
+            this.setOperation(operation);
+            this.parent.submit(this);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            RequestTransaction that = (RequestTransaction) o;
+            return transactionId == that.transactionId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(transactionId);
+        }
+    }
+
+}
diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManagerTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManagerTest.java
new file mode 100644
index 0000000..e92adc4
--- /dev/null
+++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManagerTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.spi.optimizer;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RequestTransactionManagerTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(RequestTransactionManagerTest.class);
+
+    @Test
+    public void simpleExample() throws ExecutionException, InterruptedException {
+        CompletableFuture<Void> sendRequest = new CompletableFuture<>();
+        CompletableFuture<Void> receiveResponse = new CompletableFuture<>();
+        CompletableFuture<Void> transactionIsFinished = new CompletableFuture<>();
+
+        RequestTransactionManager tm = new RequestTransactionManager();
+
+        // Assert there is no request going on
+        assertEquals(0, tm.getNumberOfActiveRequests());
+        // Send a Request
+        sendRequest(tm, sendRequest, receiveResponse, transactionIsFinished);
+        // Assert that there is a request going on
+        sendRequest.get();
+        assertEquals(1, tm.getNumberOfActiveRequests());
+        // Finish the Request with a response
+        receiveResponse.complete(null);
+        // Wait till async operation completed
+        transactionIsFinished.get();
+        // Here, there should no longer be a running request
+        // Assert that there is no request going on
+        assertEquals(0, tm.getNumberOfActiveRequests());
+    }
+
+    @Test
+    public void exampleWithMultipleRequests() throws ExecutionException, InterruptedException {
+        CompletableFuture<Void> sendRequest1 = new CompletableFuture<>();
+        CompletableFuture<Void> endRequest1 = new CompletableFuture<>();
+        CompletableFuture<Void> requestIsEnded1 = new CompletableFuture<>();
+        CompletableFuture<Void> sendRequest2 = new CompletableFuture<>();
+        CompletableFuture<Void> endRequest2 = new CompletableFuture<>();
+        CompletableFuture<Void> requestIsEnded2 = new CompletableFuture<>();
+
+        RequestTransactionManager tm = new RequestTransactionManager();
+
+        // Assert there is no request going on
+        assertEquals(0, tm.getNumberOfActiveRequests());
+        // Send Request 1
+        sendRequest(tm, sendRequest1, endRequest1, requestIsEnded1);
+        // Send Request 2
+        sendRequest(tm, sendRequest2, endRequest2, requestIsEnded2);
+
+        // Assert that there is a request going on
+        sendRequest1.get();
+        assertEquals(1, tm.getNumberOfActiveRequests());
+        // Finish the Request with a response
+        endRequest1.complete(null);
+        // Wait till async operation (and transaction end) completed
+        requestIsEnded1.get();
+        // Request 2 should now be processed and finish
+        sendRequest2.get();
+        endRequest2.complete(null);
+        requestIsEnded2.get();
+        assertEquals(0, tm.getNumberOfActiveRequests());
+    }
+
+    @Test
+    public void version2() throws ExecutionException, InterruptedException {
+        CompletableFuture<Void> sendRequest = new CompletableFuture<>();
+        CompletableFuture<Void> receiveResponse = new CompletableFuture<>();
+        CompletableFuture<Void> transactionIsFinished = new CompletableFuture<>();
+
+        RequestTransactionManager tm = new RequestTransactionManager();
+        RequestTransactionManager.RequestTransaction handle = tm.startRequest();
+        handle.submit(() -> {
+            // ...
+            sendRequest.complete(null);
+            // Receive
+            receiveResponse.thenAccept((n) -> {
+                handle.endRequest();
+                transactionIsFinished.complete(null);
+            });
+        });
+
+//        // Exception case
+//        handle.failRequest(new RuntimeException());
+
+        // Assert that there is a request going on
+        sendRequest.get();
+        assertEquals(1, tm.getNumberOfActiveRequests());
+        // Finish the Request with a response
+        receiveResponse.complete(null);
+        // Wait till async operation completed
+        transactionIsFinished.get();
+        // Here, there should no longer be a running request
+        // Assert that there is no request going on
+        assertEquals(0, tm.getNumberOfActiveRequests());
+    }
+
+    @Test
+    public void abortTransactionFromExternally() throws ExecutionException, InterruptedException {
+        CompletableFuture<Void> sendRequest = new CompletableFuture<>();
+        CompletableFuture<Void> receiveResponse = new CompletableFuture<>();
+        CompletableFuture<Void> transactionIsFinished = new CompletableFuture<>();
+
+        RequestTransactionManager tm = new RequestTransactionManager();
+        RequestTransactionManager.RequestTransaction handle = tm.startRequest();
+        handle.submit(() -> {
+            // ...
+            sendRequest.complete(null);
+            // Receive
+            receiveResponse.thenAccept((n) -> {
+                handle.endRequest();
+                transactionIsFinished.complete(null);
+            });
+        });
+
+        // Assert that there is a request going on
+        sendRequest.get();
+
+        // Exception case
+        handle.failRequest(new RuntimeException());
+
+        // Wait some millis?
+        assertEquals(0, tm.getNumberOfActiveRequests());
+
+        // Assert that its cancelled
+        assertTrue(handle.getCompletionFuture().isCancelled());
+    }
+
+    private void sendRequest(RequestTransactionManager tm, CompletableFuture<Void> sendRequest, CompletableFuture<Void> endRequest, CompletableFuture<Void> requestIsEnded) {
+        tm.submit(handle -> {
+            handle.submit(() -> {
+                // Wait till the Request is sent
+                sendRequest.complete(null);
+                // Receive
+                endRequest.thenAccept((n) -> {
+                    handle.endRequest();
+                    requestIsEnded.complete(null);
+                });
+            });
+        });
+    }
+
+}
\ No newline at end of file
diff --git a/sandbox/test-java-s7-driver/pom.xml b/sandbox/test-java-s7-driver/pom.xml
index e8e1210..f014a3a 100644
--- a/sandbox/test-java-s7-driver/pom.xml
+++ b/sandbox/test-java-s7-driver/pom.xml
@@ -108,6 +108,11 @@
       <!-- Scope is 'provided' as this way it's not shipped with the driver -->
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index 427f560..3b7c457 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -47,6 +47,7 @@ import org.apache.plc4x.java.spi.messages.*;
 import org.apache.plc4x.java.spi.generation.ParseException;
 import org.apache.plc4x.java.spi.generation.ReadBuffer;
 import org.apache.plc4x.java.spi.generation.WriteBuffer;
+import org.apache.plc4x.java.spi.optimizer.RequestTransactionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +59,11 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * The S7 Protocol states that there can not be more then {min(maxAmqCaller, maxAmqCallee} "ongoing" requests.
+ * So we need to limit those.
+ * Thus, each request goes to a Work Queue and this Queue ensures, that only 3 are open at the same time.
+ */
 public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements HasConfiguration<S7Configuration> {
 
     private static final Logger logger = LoggerFactory.getLogger(S7ProtocolLogic.class);
@@ -72,6 +78,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
     private S7ControllerType controllerType;
 
     private static final AtomicInteger tpduGenerator = new AtomicInteger(10);
+    private RequestTransactionManager tm;
 
     @Override public void setConfiguration(S7Configuration configuration) {
         this.callingTsapId = S7TsapIdEncoder.encodeS7TsapId(DeviceGroup.PG_OR_PC, configuration.rack, configuration.slot);
@@ -84,6 +91,9 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
         this.pduSize = cotpTpduSize.getSizeInBytes() - 16;
         this.maxAmqCaller = configuration.maxAmqCaller;
         this.maxAmqCallee = configuration.maxAmqCallee;
+
+        // Initialize Transaction Manager
+        this.tm = new RequestTransactionManager(Math.min(this.maxAmqCallee, this.maxAmqCaller));
     }
 
     @Override
@@ -152,23 +162,27 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
                 new S7PayloadReadVarRequest()),
             true, (short) tpduId));
 
-        context.sendRequest(tpktPacket)
-            .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
-            .onTimeout(future::completeExceptionally)
-            .onError((p, e) -> future.completeExceptionally(e))
-            .check(p -> p.getPayload() instanceof COTPPacketData)
-            .unwrap(p -> ((COTPPacketData) p.getPayload()))
-            .check(p -> p.getPayload() instanceof S7MessageResponse)
-            .unwrap(p -> ((S7MessageResponse) p.getPayload()))
-            .check(p -> p.getTpduReference() == tpduId)
-            .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
-            .handle(p -> {
-                try {
-                    future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
-                } catch (PlcProtocolException e) {
-                    logger.warn(String.format("Error sending 'read' message: '%s'", e.getMessage()), e);
-                }
-            });
+        RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+        transaction.submit(() -> {
+            context.sendRequest(tpktPacket)
+                .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+                .onTimeout(future::completeExceptionally)
+                .onError((p, e) -> future.completeExceptionally(e))
+                .check(p -> p.getPayload() instanceof COTPPacketData)
+                .unwrap(p -> ((COTPPacketData) p.getPayload()))
+                .check(p -> p.getPayload() instanceof S7MessageResponse)
+                .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+                .check(p -> p.getTpduReference() == tpduId)
+                .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
+                .handle(p -> {
+                    try {
+                        future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
+                    } catch (PlcProtocolException e) {
+                        logger.warn(String.format("Error sending 'read' message: '%s'", e.getMessage()), e);
+                    }
+                    transaction.endRequest();
+                });
+        });
         return future;
     }
 
@@ -191,23 +205,27 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
                 new S7PayloadWriteVarRequest(payloadItems.toArray(new S7VarPayloadDataItem[0]))),
             true, (short) tpduId));
 
-        context.sendRequest(tpktPacket)
-            .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
-            .onTimeout(future::completeExceptionally)
-            .onError((p, e) -> future.completeExceptionally(e))
-            .check(p -> p.getPayload() instanceof COTPPacketData)
-            .unwrap(p -> ((COTPPacketData) p.getPayload()))
-            .check(p -> p.getPayload() instanceof S7MessageResponse)
-            .unwrap(p -> ((S7MessageResponse) p.getPayload()))
-            .check(p -> p.getTpduReference() == tpduId)
-            .check(p -> p.getParameter() instanceof S7ParameterWriteVarResponse)
-            .handle(p -> {
-                try {
-                    future.complete(((PlcWriteResponse) decodeWriteResponse(p, ((InternalPlcWriteRequest) writeRequest))));
-                } catch (PlcProtocolException e) {
-                    logger.warn(String.format("Error sending 'write' message: '%s'", e.getMessage()), e);
-                }
-            });
+        RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+        transaction.submit(() -> {
+            context.sendRequest(tpktPacket)
+                .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+                .onTimeout(future::completeExceptionally)
+                .onError((p, e) -> future.completeExceptionally(e))
+                .check(p -> p.getPayload() instanceof COTPPacketData)
+                .unwrap(p -> ((COTPPacketData) p.getPayload()))
+                .check(p -> p.getPayload() instanceof S7MessageResponse)
+                .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+                .check(p -> p.getTpduReference() == tpduId)
+                .check(p -> p.getParameter() instanceof S7ParameterWriteVarResponse)
+                .handle(p -> {
+                    try {
+                        future.complete(((PlcWriteResponse) decodeWriteResponse(p, ((InternalPlcWriteRequest) writeRequest))));
+                    } catch (PlcProtocolException e) {
+                        logger.warn(String.format("Error sending 'write' message: '%s'", e.getMessage()), e);
+                    }
+                    transaction.endRequest();
+                });
+        });
         return future;
     }