You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/12/28 22:04:44 UTC
[plc4x] branch next-gen-core updated: Implemented Generic
RequestTransactionManager and implemented it in S7 Protocol Logic.
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch next-gen-core
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/next-gen-core by this push:
new 8a3576b Implemented Generic RequestTransactionManager and implemented it in S7 Protocol Logic.
8a3576b is described below
commit 8a3576b0059993d99bd1af288c4a8f0445118cc7
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Dec 28 23:04:03 2019 +0100
Implemented Generic RequestTransactionManager and implemented it in S7 Protocol Logic.
---
.../spi/optimizer/RequestTransactionManager.java | 180 +++++++++++++++++++++
.../optimizer/RequestTransactionManagerTest.java | 170 +++++++++++++++++++
sandbox/test-java-s7-driver/pom.xml | 5 +
.../s7/readwrite/protocol/S7ProtocolLogic.java | 86 ++++++----
4 files changed, 407 insertions(+), 34 deletions(-)
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManager.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManager.java
new file mode 100644
index 0000000..d90ad37
--- /dev/null
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManager.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.spi.optimizer;
+
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+/**
+ * This is a limited Queue of Requests, a Protocol can use.
+ * <p>
+ * The Following Steps are necessary
+ * <ul>
+ * <li>Register Slot</li>
+ * <li>Pass Runnable</li>
+ * <li>On Request or Response unregister Slot</li>
+ * </ul>
+ */
+public class RequestTransactionManager {
+
+ /** Executor that performs all operations */
+ static final ExecutorService executor = Executors.newFixedThreadPool(4);
+ private final Set<RequestTransaction> runningRequests;
+ /** How many Transactions are allowed to run at the same time? */
+ private int numberOfConcurrentRequests;
+ /** Assigns each request a Unique Transaction Id, especially important for failure handling */
+ private AtomicInteger transactionId = new AtomicInteger(0);
+ /** Important, this is a FIFO Queue for Fairness! */
+ private Queue<RequestTransaction> workLog = new ConcurrentLinkedQueue<>();
+
+ public RequestTransactionManager(int numberOfConcurrentRequests) {
+ this.numberOfConcurrentRequests = numberOfConcurrentRequests;
+ // Immutable Map
+ runningRequests = ConcurrentHashMap.newKeySet();
+ }
+
+ public RequestTransactionManager() {
+ this(1);
+ }
+
+ public int getNumberOfConcurrentRequests() {
+ return numberOfConcurrentRequests;
+ }
+
+ public void setNumberOfConcurrentRequests(int numberOfConcurrentRequests) {
+ this.numberOfConcurrentRequests = numberOfConcurrentRequests;
+ }
+
+ public void submit(Consumer<RequestTransaction> context) {
+ RequestTransaction transaction = startRequest();
+ context.accept(transaction);
+ // this.submit(transaction);
+ }
+
+ void submit(RequestTransaction handle) {
+ assert handle.operation != null;
+ // Add this Request with this handle i the Worklog
+ // Put Transaction into Worklog
+ this.workLog.add(handle);
+ // Try to Process the Worklog
+ processWorklog();
+ }
+
+ private void processWorklog() {
+ while (runningRequests.size() < getNumberOfConcurrentRequests() && !workLog.isEmpty()) {
+ RequestTransaction next = workLog.remove();
+ this.runningRequests.add(next);
+ Future<?> completionFuture = executor.submit(next.operation);
+ next.setCompletionFuture(completionFuture);
+ }
+ }
+
+
+ public RequestTransaction startRequest() {
+ return new RequestTransaction(this, this.transactionId.getAndIncrement());
+ }
+
+ public int getNumberOfActiveRequests() {
+ return this.runningRequests.size();
+ }
+
+ private void failRequest(RequestTransaction transaction) {
+ // Try to fail it!
+ transaction.getCompletionFuture().cancel(true);
+ // End it
+ endRequest(transaction);
+ }
+
+ private void endRequest(RequestTransaction transaction) {
+ if (!this.runningRequests.contains(transaction)) {
+ throw new IllegalArgumentException("Unknown Transaction or Transaction already finished!");
+ }
+ this.runningRequests.remove(transaction);
+ // Process the worklog, a slot should be free now
+ processWorklog();
+ }
+
+ public static class RequestTransaction {
+
+ private final RequestTransactionManager parent;
+ private final int transactionId;
+
+ /** The iniital operation to perform to kick off the request */
+ private Runnable operation;
+ private Future<?> completionFuture;
+
+ public RequestTransaction(RequestTransactionManager parent, int transactionId) {
+ this.parent = parent;
+ this.transactionId = transactionId;
+ }
+
+ public void start() {
+
+ }
+
+ public void failRequest(Throwable t) {
+ this.parent.failRequest(this);
+ }
+
+ public void endRequest() {
+ // Remove it from Running Requests
+ this.parent.endRequest(this);
+ }
+
+ public void setOperation(Runnable operation) {
+ this.operation = operation;
+ }
+
+ public Future<?> getCompletionFuture() {
+ return completionFuture;
+ }
+
+ public void setCompletionFuture(Future<?> completionFuture) {
+ this.completionFuture = completionFuture;
+ }
+
+ public void submit(Runnable operation) {
+ this.setOperation(operation);
+ this.parent.submit(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RequestTransaction that = (RequestTransaction) o;
+ return transactionId == that.transactionId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(transactionId);
+ }
+ }
+
+}
diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManagerTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManagerTest.java
new file mode 100644
index 0000000..e92adc4
--- /dev/null
+++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/optimizer/RequestTransactionManagerTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.spi.optimizer;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RequestTransactionManagerTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(RequestTransactionManagerTest.class);
+
+ @Test
+ public void simpleExample() throws ExecutionException, InterruptedException {
+ CompletableFuture<Void> sendRequest = new CompletableFuture<>();
+ CompletableFuture<Void> receiveResponse = new CompletableFuture<>();
+ CompletableFuture<Void> transactionIsFinished = new CompletableFuture<>();
+
+ RequestTransactionManager tm = new RequestTransactionManager();
+
+ // Assert there is no request going on
+ assertEquals(0, tm.getNumberOfActiveRequests());
+ // Send a Request
+ sendRequest(tm, sendRequest, receiveResponse, transactionIsFinished);
+ // Assert that there is a request going on
+ sendRequest.get();
+ assertEquals(1, tm.getNumberOfActiveRequests());
+ // Finish the Request with a response
+ receiveResponse.complete(null);
+ // Wait till async operation completed
+ transactionIsFinished.get();
+ // Here, there should no longer be a running request
+ // Assert that there is no request going on
+ assertEquals(0, tm.getNumberOfActiveRequests());
+ }
+
+ @Test
+ public void exampleWithMultipleRequests() throws ExecutionException, InterruptedException {
+ CompletableFuture<Void> sendRequest1 = new CompletableFuture<>();
+ CompletableFuture<Void> endRequest1 = new CompletableFuture<>();
+ CompletableFuture<Void> requestIsEnded1 = new CompletableFuture<>();
+ CompletableFuture<Void> sendRequest2 = new CompletableFuture<>();
+ CompletableFuture<Void> endRequest2 = new CompletableFuture<>();
+ CompletableFuture<Void> requestIsEnded2 = new CompletableFuture<>();
+
+ RequestTransactionManager tm = new RequestTransactionManager();
+
+ // Assert there is no request going on
+ assertEquals(0, tm.getNumberOfActiveRequests());
+ // Send Request 1
+ sendRequest(tm, sendRequest1, endRequest1, requestIsEnded1);
+ // Send Request 2
+ sendRequest(tm, sendRequest2, endRequest2, requestIsEnded2);
+
+ // Assert that there is a request going on
+ sendRequest1.get();
+ assertEquals(1, tm.getNumberOfActiveRequests());
+ // Finish the Request with a response
+ endRequest1.complete(null);
+ // Wait till async operation (and transaction end) completed
+ requestIsEnded1.get();
+ // Request 2 should now be processed and finish
+ sendRequest2.get();
+ endRequest2.complete(null);
+ requestIsEnded2.get();
+ assertEquals(0, tm.getNumberOfActiveRequests());
+ }
+
+ @Test
+ public void version2() throws ExecutionException, InterruptedException {
+ CompletableFuture<Void> sendRequest = new CompletableFuture<>();
+ CompletableFuture<Void> receiveResponse = new CompletableFuture<>();
+ CompletableFuture<Void> transactionIsFinished = new CompletableFuture<>();
+
+ RequestTransactionManager tm = new RequestTransactionManager();
+ RequestTransactionManager.RequestTransaction handle = tm.startRequest();
+ handle.submit(() -> {
+ // ...
+ sendRequest.complete(null);
+ // Receive
+ receiveResponse.thenAccept((n) -> {
+ handle.endRequest();
+ transactionIsFinished.complete(null);
+ });
+ });
+
+// // Exception case
+// handle.failRequest(new RuntimeException());
+
+ // Assert that there is a request going on
+ sendRequest.get();
+ assertEquals(1, tm.getNumberOfActiveRequests());
+ // Finish the Request with a response
+ receiveResponse.complete(null);
+ // Wait till async operation completed
+ transactionIsFinished.get();
+ // Here, there should no longer be a running request
+ // Assert that there is no request going on
+ assertEquals(0, tm.getNumberOfActiveRequests());
+ }
+
+ @Test
+ public void abortTransactionFromExternally() throws ExecutionException, InterruptedException {
+ CompletableFuture<Void> sendRequest = new CompletableFuture<>();
+ CompletableFuture<Void> receiveResponse = new CompletableFuture<>();
+ CompletableFuture<Void> transactionIsFinished = new CompletableFuture<>();
+
+ RequestTransactionManager tm = new RequestTransactionManager();
+ RequestTransactionManager.RequestTransaction handle = tm.startRequest();
+ handle.submit(() -> {
+ // ...
+ sendRequest.complete(null);
+ // Receive
+ receiveResponse.thenAccept((n) -> {
+ handle.endRequest();
+ transactionIsFinished.complete(null);
+ });
+ });
+
+ // Assert that there is a request going on
+ sendRequest.get();
+
+ // Exception case
+ handle.failRequest(new RuntimeException());
+
+ // Wait some millis?
+ assertEquals(0, tm.getNumberOfActiveRequests());
+
+ // Assert that its cancelled
+ assertTrue(handle.getCompletionFuture().isCancelled());
+ }
+
+ private void sendRequest(RequestTransactionManager tm, CompletableFuture<Void> sendRequest, CompletableFuture<Void> endRequest, CompletableFuture<Void> requestIsEnded) {
+ tm.submit(handle -> {
+ handle.submit(() -> {
+ // Wait till the Request is sent
+ sendRequest.complete(null);
+ // Receive
+ endRequest.thenAccept((n) -> {
+ handle.endRequest();
+ requestIsEnded.complete(null);
+ });
+ });
+ });
+ }
+
+}
\ No newline at end of file
diff --git a/sandbox/test-java-s7-driver/pom.xml b/sandbox/test-java-s7-driver/pom.xml
index e8e1210..f014a3a 100644
--- a/sandbox/test-java-s7-driver/pom.xml
+++ b/sandbox/test-java-s7-driver/pom.xml
@@ -108,6 +108,11 @@
<!-- Scope is 'provided' as this way it's not shipped with the driver -->
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index 427f560..3b7c457 100644
--- a/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++ b/sandbox/test-java-s7-driver/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -47,6 +47,7 @@ import org.apache.plc4x.java.spi.messages.*;
import org.apache.plc4x.java.spi.generation.ParseException;
import org.apache.plc4x.java.spi.generation.ReadBuffer;
import org.apache.plc4x.java.spi.generation.WriteBuffer;
+import org.apache.plc4x.java.spi.optimizer.RequestTransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +59,11 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * The S7 Protocol states that there can not be more then {min(maxAmqCaller, maxAmqCallee} "ongoing" requests.
+ * So we need to limit those.
+ * Thus, each request goes to a Work Queue and this Queue ensures, that only 3 are open at the same time.
+ */
public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements HasConfiguration<S7Configuration> {
private static final Logger logger = LoggerFactory.getLogger(S7ProtocolLogic.class);
@@ -72,6 +78,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
private S7ControllerType controllerType;
private static final AtomicInteger tpduGenerator = new AtomicInteger(10);
+ private RequestTransactionManager tm;
@Override public void setConfiguration(S7Configuration configuration) {
this.callingTsapId = S7TsapIdEncoder.encodeS7TsapId(DeviceGroup.PG_OR_PC, configuration.rack, configuration.slot);
@@ -84,6 +91,9 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
this.pduSize = cotpTpduSize.getSizeInBytes() - 16;
this.maxAmqCaller = configuration.maxAmqCaller;
this.maxAmqCallee = configuration.maxAmqCallee;
+
+ // Initialize Transaction Manager
+ this.tm = new RequestTransactionManager(Math.min(this.maxAmqCallee, this.maxAmqCaller));
}
@Override
@@ -152,23 +162,27 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
new S7PayloadReadVarRequest()),
true, (short) tpduId));
- context.sendRequest(tpktPacket)
- .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
- .onTimeout(future::completeExceptionally)
- .onError((p, e) -> future.completeExceptionally(e))
- .check(p -> p.getPayload() instanceof COTPPacketData)
- .unwrap(p -> ((COTPPacketData) p.getPayload()))
- .check(p -> p.getPayload() instanceof S7MessageResponse)
- .unwrap(p -> ((S7MessageResponse) p.getPayload()))
- .check(p -> p.getTpduReference() == tpduId)
- .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
- .handle(p -> {
- try {
- future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
- } catch (PlcProtocolException e) {
- logger.warn(String.format("Error sending 'read' message: '%s'", e.getMessage()), e);
- }
- });
+ RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+ transaction.submit(() -> {
+ context.sendRequest(tpktPacket)
+ .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+ .onTimeout(future::completeExceptionally)
+ .onError((p, e) -> future.completeExceptionally(e))
+ .check(p -> p.getPayload() instanceof COTPPacketData)
+ .unwrap(p -> ((COTPPacketData) p.getPayload()))
+ .check(p -> p.getPayload() instanceof S7MessageResponse)
+ .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+ .check(p -> p.getTpduReference() == tpduId)
+ .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
+ .handle(p -> {
+ try {
+ future.complete(((PlcReadResponse) decodeReadResponse(p, ((InternalPlcReadRequest) readRequest))));
+ } catch (PlcProtocolException e) {
+ logger.warn(String.format("Error sending 'read' message: '%s'", e.getMessage()), e);
+ }
+ transaction.endRequest();
+ });
+ });
return future;
}
@@ -191,23 +205,27 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> implements Ha
new S7PayloadWriteVarRequest(payloadItems.toArray(new S7VarPayloadDataItem[0]))),
true, (short) tpduId));
- context.sendRequest(tpktPacket)
- .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
- .onTimeout(future::completeExceptionally)
- .onError((p, e) -> future.completeExceptionally(e))
- .check(p -> p.getPayload() instanceof COTPPacketData)
- .unwrap(p -> ((COTPPacketData) p.getPayload()))
- .check(p -> p.getPayload() instanceof S7MessageResponse)
- .unwrap(p -> ((S7MessageResponse) p.getPayload()))
- .check(p -> p.getTpduReference() == tpduId)
- .check(p -> p.getParameter() instanceof S7ParameterWriteVarResponse)
- .handle(p -> {
- try {
- future.complete(((PlcWriteResponse) decodeWriteResponse(p, ((InternalPlcWriteRequest) writeRequest))));
- } catch (PlcProtocolException e) {
- logger.warn(String.format("Error sending 'write' message: '%s'", e.getMessage()), e);
- }
- });
+ RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+ transaction.submit(() -> {
+ context.sendRequest(tpktPacket)
+ .expectResponse(TPKTPacket.class, REQUEST_TIMEOUT)
+ .onTimeout(future::completeExceptionally)
+ .onError((p, e) -> future.completeExceptionally(e))
+ .check(p -> p.getPayload() instanceof COTPPacketData)
+ .unwrap(p -> ((COTPPacketData) p.getPayload()))
+ .check(p -> p.getPayload() instanceof S7MessageResponse)
+ .unwrap(p -> ((S7MessageResponse) p.getPayload()))
+ .check(p -> p.getTpduReference() == tpduId)
+ .check(p -> p.getParameter() instanceof S7ParameterWriteVarResponse)
+ .handle(p -> {
+ try {
+ future.complete(((PlcWriteResponse) decodeWriteResponse(p, ((InternalPlcWriteRequest) writeRequest))));
+ } catch (PlcProtocolException e) {
+ logger.warn(String.format("Error sending 'write' message: '%s'", e.getMessage()), e);
+ }
+ transaction.endRequest();
+ });
+ });
return future;
}