You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cg...@apache.org on 2023/01/28 21:16:56 UTC

[plc4x] branch develop updated: Fix issue-602, completely kills the tasks associated with the S7 driver. (#771)

This is an automated email from the ASF dual-hosted git repository.

cgarcia pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 04702e460f Fix issue-602, completely kills the tasks associated with the S7 driver. (#771)
04702e460f is described below

commit 04702e460f91ebaa1e0f80f4f73c2303ec585507
Author: César José García León <ce...@gmail.com>
AuthorDate: Sat Jan 28 17:16:47 2023 -0400

    Fix issue-602, completely kills the tasks associated with the S7 driver. (#771)
    
    Co-authored-by: César García <ce...@ceos.com.ve>
---
 .../s7/readwrite/protocol/S7ProtocolLogic.java     | 69 +++++++++++++++++++++-
 .../spi/transaction/RequestTransactionManager.java | 23 +++++++-
 2 files changed, 89 insertions(+), 3 deletions(-)

diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index a5277d4938..f36a06cd5f 100644
--- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -58,11 +58,15 @@ import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.tuple.MutablePair;
 
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
@@ -85,7 +89,16 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
 
     private final Logger logger = LoggerFactory.getLogger(S7ProtocolLogic.class);
     private final AtomicInteger tpduGenerator = new AtomicInteger(10);
-
+    
+    /*
+     * Task group for managing connection redundancy.
+     */
+    private ExecutorService clientExecutorService = Executors.newFixedThreadPool(4, new BasicThreadFactory.Builder()
+                                                    .namingPattern("plc4x-app-thread-%d")
+                                                    .daemon(true)
+                                                    .priority(Thread.MAX_PRIORITY)
+                                                    .build());    
+    
     /*
      * Take into account that the size of this buffer depends on the final device.
      * S7-300 goes from 20 to 300 and for S7-400 it goes from 300 to 10000.
@@ -101,6 +114,17 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
     private final S7PlcSubscriptionHandle usrHandle = new S7PlcSubscriptionHandle(EventType.USR, EventLogic);
     private final S7PlcSubscriptionHandle almHandle = new S7PlcSubscriptionHandle(EventType.ALM, EventLogic);
 
+    /*
+    * For the reconnection functionality by a "TimeOut" of the connection,
+    * you must keep track of open transactions. In general, an S7 device 
+    * supports a couple of simultaneous requests.
+    * The rhythm of execution must be determined by the TransactionManager.
+    * So far it is the way to indicate to the user that he must redo 
+    * his request.
+    */
+    private HashMap<Object,MutablePair<RequestTransactionManager.RequestTransaction, Object>> active_requests = new HashMap<>();
+
+    
     private S7DriverContext s7DriverContext;
     private RequestTransactionManager tm;
 
@@ -203,6 +227,27 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
             });
     }
 
+    
+  /*
+    * It performs the sequential and safe shutdown of the driver. 
+    * Completion of pending requests, executors and associated tasks.
+    */    
+    @Override
+    public void onDisconnect(ConversationContext<TPKTPacket> context) {
+        //1. Clear all pending requests and their associated transaction          
+        cleanFutures(); 
+        //2. Here we shutdown the local task executor.
+        clientExecutorService.shutdown();
+        //3. Performs the shutdown of the transaction executor.
+        tm.shutdown();
+        //4. Finish the execution of the tasks for the handling of Events. 
+        EventLogic.stop();
+        //5. Executes the closing of the main channel.
+        context.getChannel().close();
+        //6. Here is the stop of any task or state machine that is added.        
+    }
+
+   
     @Override
     public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest;
@@ -1016,5 +1061,27 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
             future.completeExceptionally(e);
         }
     }
+    
+    private void cleanFutures(){
+        //TODO: Debe ser ejecutado si la conexion esta levanta.
+        active_requests.forEach((f,p)->{
+            CompletableFuture<Object> cf = (CompletableFuture<Object>) f;
+            try {
+                if (!cf.isDone()) {
+                    logger.info("CF");
+                    cf.cancel(true);
+                    logger.info("ClientCF");
+                    ((CompletableFuture<Object>) p.getRight()).completeExceptionally(new PlcRuntimeException("Disconnected"));                     
+                    logger.info("TM");
+                    p.getLeft().endRequest();
+                };
+            } catch (Exception ex){
+                logger.info(ex.toString());
+            }
+        });
+        active_requests.clear();   
+
+    }    
+    
 
 }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
index 4b42caa338..3592ff2e55 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 
 /**
  * This is a limited Queue of Requests, a Protocol can use.
@@ -48,7 +49,14 @@ public class RequestTransactionManager {
     private static final Logger logger = LoggerFactory.getLogger(RequestTransactionManager.class);
 
     /** Executor that performs all operations */
-    static final ExecutorService executor = Executors.newScheduledThreadPool(4);
+    //static final ExecutorService executor = Executors.newScheduledThreadPool(4);
+
+    final ExecutorService executor = Executors.newFixedThreadPool(4, new BasicThreadFactory.Builder()
+                                                    .namingPattern("plc4x-tm-thread-%d")
+                                                    .daemon(true)
+                                                    .priority(Thread.MAX_PRIORITY)
+                                                    .build());    
+    
     private final Set<RequestTransaction> runningRequests;
     /** How many Transactions are allowed to run at the same time? */
     private int numberOfConcurrentRequests;
@@ -83,6 +91,13 @@ public class RequestTransactionManager {
         // As we might have increased the number, try to send some more requests.
         processWorklog();
     }
+    
+    /*
+    * It allows the sequential shutdown of the associated driver.
+    */
+    public void shutdown(){
+        executor.shutdown();
+    }    
 
     public void submit(Consumer<RequestTransaction> context) {
         RequestTransaction transaction = startRequest();
@@ -204,12 +219,16 @@ public class RequestTransactionManager {
             this.delegate = delegate;
         }
 
+        //TODO: Check MDC used. Created exception in Hop application        
         @Override
         public void run() {
-            try (final MDC.MDCCloseable closeable = MDC.putCloseable("plc4x.transactionId", Integer.toString(transactionId))) {
+            //try (final MDC.MDCCloseable closeable = MDC.putCloseable("plc4x.transactionId", Integer.toString(transactionId))) {
+            try{    
                 logger.trace("Start execution of transaction {}", transactionId);
                 delegate.run();
                 logger.trace("Completed execution of transaction {}", transactionId);
+            }  catch (Exception ex) {
+                logger.info(ex.getMessage());
             }
         }
     }