You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cg...@apache.org on 2023/01/28 21:16:56 UTC
[plc4x] branch develop updated: Fix issue-602, completely kills the tasks associated with the S7 driver. (#771)
This is an automated email from the ASF dual-hosted git repository.
cgarcia pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 04702e460f Fix issue-602, completely kills the tasks associated with the S7 driver. (#771)
04702e460f is described below
commit 04702e460f91ebaa1e0f80f4f73c2303ec585507
Author: César José García León <ce...@gmail.com>
AuthorDate: Sat Jan 28 17:16:47 2023 -0400
Fix issue-602, completely kills the tasks associated with the S7 driver. (#771)
Co-authored-by: César García <ce...@ceos.com.ve>
---
.../s7/readwrite/protocol/S7ProtocolLogic.java | 69 +++++++++++++++++++++-
.../spi/transaction/RequestTransactionManager.java | 23 +++++++-
2 files changed, 89 insertions(+), 3 deletions(-)
diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index a5277d4938..f36a06cd5f 100644
--- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -58,11 +58,15 @@ import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.IntStream;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
@@ -85,7 +89,16 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
private final Logger logger = LoggerFactory.getLogger(S7ProtocolLogic.class);
private final AtomicInteger tpduGenerator = new AtomicInteger(10);
-
+
+ /*
+ * Task group for managing connection redundancy.
+ */
+ private ExecutorService clientExecutorService = Executors.newFixedThreadPool(4, new BasicThreadFactory.Builder()
+ .namingPattern("plc4x-app-thread-%d")
+ .daemon(true)
+ .priority(Thread.MAX_PRIORITY)
+ .build());
+
/*
* Take into account that the size of this buffer depends on the final device.
* S7-300 goes from 20 to 300 and for S7-400 it goes from 300 to 10000.
@@ -101,6 +114,17 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
private final S7PlcSubscriptionHandle usrHandle = new S7PlcSubscriptionHandle(EventType.USR, EventLogic);
private final S7PlcSubscriptionHandle almHandle = new S7PlcSubscriptionHandle(EventType.ALM, EventLogic);
+ /*
+ * For the reconnection functionality by a "TimeOut" of the connection,
+ * you must keep track of open transactions. In general, an S7 device
+ * supports a couple of simultaneous requests.
+ * The rhythm of execution must be determined by the TransactionManager.
+ * So far it is the way to indicate to the user that he must redo
+ * his request.
+ */
+ private HashMap<Object,MutablePair<RequestTransactionManager.RequestTransaction, Object>> active_requests = new HashMap<>();
+
+
private S7DriverContext s7DriverContext;
private RequestTransactionManager tm;
@@ -203,6 +227,27 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
});
}
+
+ /*
+ * It performs the sequential and safe shutdown of the driver.
+ * Completion of pending requests, executors and associated tasks.
+ */
+ @Override
+ public void onDisconnect(ConversationContext<TPKTPacket> context) {
+ //1. Clear all pending requests and their associated transaction
+ cleanFutures();
+ //2. Here we shutdown the local task executor.
+ clientExecutorService.shutdown();
+ //3. Performs the shutdown of the transaction executor.
+ tm.shutdown();
+ //4. Finish the execution of the tasks for the handling of Events.
+ EventLogic.stop();
+ //5. Executes the closing of the main channel.
+ context.getChannel().close();
+ //6. Here is the stop of any task or state machine that is added.
+ }
+
+
@Override
public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest;
@@ -1016,5 +1061,27 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
future.completeExceptionally(e);
}
}
+
+ private void cleanFutures(){
+ //TODO: Debe ser ejecutado si la conexion esta levanta.
+ active_requests.forEach((f,p)->{
+ CompletableFuture<Object> cf = (CompletableFuture<Object>) f;
+ try {
+ if (!cf.isDone()) {
+ logger.info("CF");
+ cf.cancel(true);
+ logger.info("ClientCF");
+ ((CompletableFuture<Object>) p.getRight()).completeExceptionally(new PlcRuntimeException("Disconnected"));
+ logger.info("TM");
+ p.getLeft().endRequest();
+ };
+ } catch (Exception ex){
+ logger.info(ex.toString());
+ }
+ });
+ active_requests.clear();
+
+ }
+
}
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
index 4b42caa338..3592ff2e55 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
/**
* This is a limited Queue of Requests, a Protocol can use.
@@ -48,7 +49,14 @@ public class RequestTransactionManager {
private static final Logger logger = LoggerFactory.getLogger(RequestTransactionManager.class);
/** Executor that performs all operations */
- static final ExecutorService executor = Executors.newScheduledThreadPool(4);
+ //static final ExecutorService executor = Executors.newScheduledThreadPool(4);
+
+ final ExecutorService executor = Executors.newFixedThreadPool(4, new BasicThreadFactory.Builder()
+ .namingPattern("plc4x-tm-thread-%d")
+ .daemon(true)
+ .priority(Thread.MAX_PRIORITY)
+ .build());
+
private final Set<RequestTransaction> runningRequests;
/** How many Transactions are allowed to run at the same time? */
private int numberOfConcurrentRequests;
@@ -83,6 +91,13 @@ public class RequestTransactionManager {
// As we might have increased the number, try to send some more requests.
processWorklog();
}
+
+ /*
+ * It allows the sequential shutdown of the associated driver.
+ */
+ public void shutdown(){
+ executor.shutdown();
+ }
public void submit(Consumer<RequestTransaction> context) {
RequestTransaction transaction = startRequest();
@@ -204,12 +219,16 @@ public class RequestTransactionManager {
this.delegate = delegate;
}
+ //TODO: Check MDC used. Created exception in Hop application
@Override
public void run() {
- try (final MDC.MDCCloseable closeable = MDC.putCloseable("plc4x.transactionId", Integer.toString(transactionId))) {
+ //try (final MDC.MDCCloseable closeable = MDC.putCloseable("plc4x.transactionId", Integer.toString(transactionId))) {
+ try{
logger.trace("Start execution of transaction {}", transactionId);
delegate.run();
logger.trace("Completed execution of transaction {}", transactionId);
+ } catch (Exception ex) {
+ logger.info(ex.getMessage());
}
}
}