You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/08/31 12:12:25 UTC

[plc4x] branch develop updated (d83b8ff -> a4f11e2)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git.


    from d83b8ff  - Made the transaction-id reset to 1 when reaching the 0xFFFF
     new 72cb97f  PLC4X-245 - [Modbus] Apache NiFi processor throws java.io.IOException after a while
     new a4f11e2  PLC4X-132 - [S7] Communication to S7 PLC dies in some situations

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/test/resources/testsuite/AdsDriverIT.xml   | 218 ++++++++++++++++++++-
 .../s7/readwrite/protocol/S7ProtocolLogic.java     |  62 +++++-
 .../s7/src/test/resources/testsuite/S7DriverIT.xml |  75 +++++++
 .../apache-nifi/nifi-plc4x-processors/pom.xml      |   6 +
 .../org/apache/plc4x/nifi/BasePlc4xProcessor.java  |  23 +--
 .../org/apache/plc4x/nifi/Plc4xSinkProcessor.java  |  50 ++---
 .../apache/plc4x/nifi/Plc4xSourceProcessor.java    |  63 +++---
 .../plc4x/test/driver/DriverTestsuiteRunner.java   |  35 ++--
 .../plc4x/test/driver/model/DriverTestsuite.java   |  20 +-
 9 files changed, 444 insertions(+), 108 deletions(-)


[plc4x] 02/02: PLC4X-132 - [S7] Communication to S7 PLC dies in some situations

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit a4f11e2643fbe76ffc552cf8bd2695819cc20083
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Aug 31 14:12:17 2020 +0200

    PLC4X-132 - [S7] Communication to S7 PLC dies in some situations
    
    Made the S7 driver correctly handle error responses. Started adding explicit handling of known error codes with suggestions of how to resolve the issue.develop
    
    NO JIRA:
    - Updated the integration test to correctly initialize a new connection for every testcase (Making it possible to run only individual testcases)
---
 .../src/test/resources/testsuite/AdsDriverIT.xml   | 218 ++++++++++++++++++++-
 .../s7/readwrite/protocol/S7ProtocolLogic.java     |  62 +++++-
 .../s7/src/test/resources/testsuite/S7DriverIT.xml |  75 +++++++
 .../plc4x/test/driver/DriverTestsuiteRunner.java   |  35 ++--
 .../plc4x/test/driver/model/DriverTestsuite.java   |  20 +-
 5 files changed, 370 insertions(+), 40 deletions(-)

diff --git a/plc4j/drivers/ads/src/test/resources/testsuite/AdsDriverIT.xml b/plc4j/drivers/ads/src/test/resources/testsuite/AdsDriverIT.xml
index 9f0731c..6eb1f65 100644
--- a/plc4j/drivers/ads/src/test/resources/testsuite/AdsDriverIT.xml
+++ b/plc4j/drivers/ads/src/test/resources/testsuite/AdsDriverIT.xml
@@ -167,7 +167,7 @@
   </testcase>
 
   <testcase>
-    <name>Multi element direct read request</name>
+    <name>Multi-element direct read request</name>
     <description>
       When doing a simple read request with only direct addresses, but multiple
       ones, the unofficial multi-read method should be used to read all in one go.
@@ -221,7 +221,7 @@
               <broadcast>false</broadcast>
             </state>
             <errorCode>0</errorCode>
-            <invokeId>2</invokeId>
+            <invokeId>1</invokeId>
             <data className="org.apache.plc4x.java.ads.readwrite.AdsReadWriteRequest">
               <indexGroup>61568</indexGroup>
               <indexOffset>2</indexOffset>
@@ -277,7 +277,7 @@
               <broadcast>false</broadcast>
             </state>
             <errorCode>0</errorCode>
-            <invokeId>2</invokeId>
+            <invokeId>1</invokeId>
             <data className="org.apache.plc4x.java.ads.readwrite.AdsReadWriteResponse">
               <result>OK</result>
               <data>AAAAAAAAAAABAQ==</data>
@@ -371,7 +371,7 @@
               <broadcast>false</broadcast>
             </state>
             <errorCode>0</errorCode>
-            <invokeId>3</invokeId>
+            <invokeId>1</invokeId>
             <data className="org.apache.plc4x.java.ads.readwrite.AdsReadWriteRequest">
               <indexGroup>61443</indexGroup>
               <indexOffset>0</indexOffset>
@@ -416,7 +416,7 @@
               <broadcast>false</broadcast>
             </state>
             <errorCode>0</errorCode>
-            <invokeId>3</invokeId>
+            <invokeId>1</invokeId>
             <data className="org.apache.plc4x.java.ads.readwrite.AdsReadResponse">
               <result>OK</result>
               <data>AQCAGw==</data>
@@ -458,7 +458,7 @@
               <broadcast>false</broadcast>
             </state>
             <errorCode>0</errorCode>
-            <invokeId>4</invokeId>
+            <invokeId>2</invokeId>
             <data className="org.apache.plc4x.java.ads.readwrite.AdsReadRequest">
               <indexGroup>61445</indexGroup>
               <indexOffset>461373441</indexOffset>
@@ -501,7 +501,7 @@
               <broadcast>false</broadcast>
             </state>
             <errorCode>0</errorCode>
-            <invokeId>4</invokeId>
+            <invokeId>2</invokeId>
             <data className="org.apache.plc4x.java.ads.readwrite.AdsReadResponse">
               <result>OK</result>
               <data>AAAAAAAAAAABAQ==</data>
@@ -547,6 +547,93 @@
           </fields>
         </TestReadRequest>
       </api-request>
+      <outgoing-plc-message name="Send Resolve Symbolic Address Request">
+        <AmsTCPPacket className="org.apache.plc4x.java.ads.readwrite.AmsTCPPacket">
+          <userdata className="org.apache.plc4x.java.ads.readwrite.AmsPacket">
+            <targetAmsNetId className="org.apache.plc4x.java.ads.readwrite.AmsNetId">
+              <octet1>192</octet1>
+              <octet2>168</octet2>
+              <octet3>23</octet3>
+              <octet4>20</octet4>
+              <octet5>1</octet5>
+              <octet6>1</octet6>
+            </targetAmsNetId>
+            <targetAmsPort>48898</targetAmsPort>
+            <sourceAmsNetId className="org.apache.plc4x.java.ads.readwrite.AmsNetId">
+              <octet1>192</octet1>
+              <octet2>168</octet2>
+              <octet3>23</octet3>
+              <octet4>200</octet4>
+              <octet5>1</octet5>
+              <octet6>1</octet6>
+            </sourceAmsNetId>
+            <sourceAmsPort>48898</sourceAmsPort>
+            <commandId>ADS_READ_WRITE</commandId>
+            <state className="org.apache.plc4x.java.ads.readwrite.State">
+              <initCommand>false</initCommand>
+              <updCommand>false</updCommand>
+              <timestampAdded>false</timestampAdded>
+              <highPriorityCommand>false</highPriorityCommand>
+              <systemCommand>false</systemCommand>
+              <adsCommand>true</adsCommand>
+              <noReturn>false</noReturn>
+              <response>false</response>
+              <broadcast>false</broadcast>
+            </state>
+            <errorCode>0</errorCode>
+            <invokeId>1</invokeId>
+            <data className="org.apache.plc4x.java.ads.readwrite.AdsReadWriteRequest">
+              <indexGroup>61443</indexGroup>
+              <indexOffset>0</indexOffset>
+              <readLength>4</readLength>
+              <items/>
+              <data>bWFpbi5mX3RyaWdEYXRlaUdlbGVzZW4uTQA=</data>
+            </data>
+          </userdata>
+        </AmsTCPPacket>
+      </outgoing-plc-message>
+      <incoming-plc-message name="Receive Resolve Symbolic Address Response">
+        <AmsTCPPacket className="org.apache.plc4x.java.ads.readwrite.AmsTCPPacket">
+          <userdata className="org.apache.plc4x.java.ads.readwrite.AmsPacket">
+            <targetAmsNetId className="org.apache.plc4x.java.ads.readwrite.AmsNetId">
+              <octet1>192</octet1>
+              <octet2>168</octet2>
+              <octet3>23</octet3>
+              <octet4>200</octet4>
+              <octet5>1</octet5>
+              <octet6>1</octet6>
+            </targetAmsNetId>
+            <targetAmsPort>48898</targetAmsPort>
+            <sourceAmsNetId className="org.apache.plc4x.java.ads.readwrite.AmsNetId">
+              <octet1>192</octet1>
+              <octet2>168</octet2>
+              <octet3>23</octet3>
+              <octet4>20</octet4>
+              <octet5>1</octet5>
+              <octet6>1</octet6>
+            </sourceAmsNetId>
+            <sourceAmsPort>48898</sourceAmsPort>
+            <commandId>ADS_READ_WRITE</commandId>
+            <state className="org.apache.plc4x.java.ads.readwrite.State">
+              <initCommand>false</initCommand>
+              <updCommand>false</updCommand>
+              <timestampAdded>false</timestampAdded>
+              <highPriorityCommand>false</highPriorityCommand>
+              <systemCommand>false</systemCommand>
+              <adsCommand>true</adsCommand>
+              <noReturn>false</noReturn>
+              <response>true</response>
+              <broadcast>false</broadcast>
+            </state>
+            <errorCode>0</errorCode>
+            <invokeId>1</invokeId>
+            <data className="org.apache.plc4x.java.ads.readwrite.AdsReadResponse">
+              <result>OK</result>
+              <data>AQCAGw==</data>
+            </data>
+          </userdata>
+        </AmsTCPPacket>
+      </incoming-plc-message>
       <outgoing-plc-message name="Send Ads Read Request">
         <AmsTCPPacket className="org.apache.plc4x.java.ads.readwrite.AmsTCPPacket">
           <userdata className="org.apache.plc4x.java.ads.readwrite.AmsPacket">
@@ -581,7 +668,7 @@
               <broadcast>false</broadcast>
             </state>
             <errorCode>0</errorCode>
-            <invokeId>5</invokeId>
+            <invokeId>2</invokeId>
             <data className="org.apache.plc4x.java.ads.readwrite.AdsReadRequest">
               <indexGroup>61445</indexGroup>
               <indexOffset>461373441</indexOffset>
@@ -624,7 +711,7 @@
               <broadcast>false</broadcast>
             </state>
             <errorCode>0</errorCode>
-            <invokeId>5</invokeId>
+            <invokeId>2</invokeId>
             <data className="org.apache.plc4x.java.ads.readwrite.AdsReadResponse">
               <result>OK</result>
               <data>AAAAAAAAAAABAQ==</data>
@@ -649,6 +736,119 @@
           </hurz1>
         </DefaultPlcReadResponse>
       </api-response>
+      <delay>500</delay>
+      <api-request name="Receive a second Read Request for the same resource from application">
+        <TestReadRequest className="org.apache.plc4x.test.driver.model.api.TestReadRequest">
+          <fields>
+            <field className="org.apache.plc4x.test.driver.model.api.TestField">
+              <name>hurz1</name>
+              <address>main.f_trigDateiGelesen.M:BOOL</address>
+            </field>
+          </fields>
+        </TestReadRequest>
+      </api-request>
+      <outgoing-plc-message name="Send Ads Read Request directly using the preciously resolved address">
+        <AmsTCPPacket className="org.apache.plc4x.java.ads.readwrite.AmsTCPPacket">
+          <userdata className="org.apache.plc4x.java.ads.readwrite.AmsPacket">
+            <targetAmsNetId className="org.apache.plc4x.java.ads.readwrite.AmsNetId">
+              <octet1>192</octet1>
+              <octet2>168</octet2>
+              <octet3>23</octet3>
+              <octet4>20</octet4>
+              <octet5>1</octet5>
+              <octet6>1</octet6>
+            </targetAmsNetId>
+            <targetAmsPort>48898</targetAmsPort>
+            <sourceAmsNetId className="org.apache.plc4x.java.ads.readwrite.AmsNetId">
+              <octet1>192</octet1>
+              <octet2>168</octet2>
+              <octet3>23</octet3>
+              <octet4>200</octet4>
+              <octet5>1</octet5>
+              <octet6>1</octet6>
+            </sourceAmsNetId>
+            <sourceAmsPort>48898</sourceAmsPort>
+            <commandId>ADS_READ</commandId>
+            <state className="org.apache.plc4x.java.ads.readwrite.State">
+              <initCommand>false</initCommand>
+              <updCommand>false</updCommand>
+              <timestampAdded>false</timestampAdded>
+              <highPriorityCommand>false</highPriorityCommand>
+              <systemCommand>false</systemCommand>
+              <adsCommand>true</adsCommand>
+              <noReturn>false</noReturn>
+              <response>false</response>
+              <broadcast>false</broadcast>
+            </state>
+            <errorCode>0</errorCode>
+            <invokeId>3</invokeId>
+            <data className="org.apache.plc4x.java.ads.readwrite.AdsReadRequest">
+              <indexGroup>61445</indexGroup>
+              <indexOffset>461373441</indexOffset>
+              <length>1</length>
+            </data>
+          </userdata>
+        </AmsTCPPacket>
+      </outgoing-plc-message>
+      <incoming-plc-message name="Receive Ads Read Response again">
+        <AmsTCPPacket className="org.apache.plc4x.java.ads.readwrite.AmsTCPPacket">
+          <userdata className="org.apache.plc4x.java.ads.readwrite.AmsPacket">
+            <targetAmsNetId className="org.apache.plc4x.java.ads.readwrite.AmsNetId">
+              <octet1>192</octet1>
+              <octet2>168</octet2>
+              <octet3>23</octet3>
+              <octet4>200</octet4>
+              <octet5>1</octet5>
+              <octet6>1</octet6>
+            </targetAmsNetId>
+            <targetAmsPort>48898</targetAmsPort>
+            <sourceAmsNetId className="org.apache.plc4x.java.ads.readwrite.AmsNetId">
+              <octet1>192</octet1>
+              <octet2>168</octet2>
+              <octet3>23</octet3>
+              <octet4>20</octet4>
+              <octet5>1</octet5>
+              <octet6>1</octet6>
+            </sourceAmsNetId>
+            <sourceAmsPort>48898</sourceAmsPort>
+            <commandId>ADS_READ</commandId>
+            <state className="org.apache.plc4x.java.ads.readwrite.State">
+              <initCommand>false</initCommand>
+              <updCommand>false</updCommand>
+              <timestampAdded>false</timestampAdded>
+              <highPriorityCommand>false</highPriorityCommand>
+              <systemCommand>false</systemCommand>
+              <adsCommand>true</adsCommand>
+              <noReturn>false</noReturn>
+              <response>true</response>
+              <broadcast>false</broadcast>
+            </state>
+            <errorCode>0</errorCode>
+            <invokeId>3</invokeId>
+            <data className="org.apache.plc4x.java.ads.readwrite.AdsReadResponse">
+              <result>OK</result>
+              <data>AAAAAAAAAAABAQ==</data>
+            </data>
+          </userdata>
+        </AmsTCPPacket>
+      </incoming-plc-message>
+      <api-response name="Report Read Response to application again">
+        <DefaultPlcReadResponse className="org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse">
+          <request className="org.apache.plc4x.java.spi.messages.DefaultPlcReadRequest">
+            <hurz1 className="org.apache.plc4x.java.ads.field.SymbolicAdsField">
+              <adsDataType>BOOL</adsDataType>
+              <numberOfElements>1</numberOfElements>
+              <symbolicField>main.f_trigDateiGelesen.M</symbolicField>
+            </hurz1>
+          </request>
+          <hurz1>
+            <code>OK</code>
+            <value className="org.apache.plc4x.java.api.value.PlcBoolean">
+              <object>false</object>
+            </value>
+          </hurz1>
+        </DefaultPlcReadResponse>
+      </api-response>
     </steps>
   </testcase>
 
diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
index ad246e6..073c572 100644
--- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
+++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java
@@ -29,6 +29,7 @@ import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.value.PlcNull;
 import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.api.value.PlcValues;
 import org.apache.plc4x.java.s7.readwrite.COTPPacket;
@@ -46,13 +47,11 @@ import org.apache.plc4x.java.s7.readwrite.S7MessageRequest;
 import org.apache.plc4x.java.s7.readwrite.S7MessageResponseData;
 import org.apache.plc4x.java.s7.readwrite.S7MessageUserData;
 import org.apache.plc4x.java.s7.readwrite.S7ParameterReadVarRequest;
-import org.apache.plc4x.java.s7.readwrite.S7ParameterReadVarResponse;
 import org.apache.plc4x.java.s7.readwrite.S7ParameterSetupCommunication;
 import org.apache.plc4x.java.s7.readwrite.S7ParameterUserData;
 import org.apache.plc4x.java.s7.readwrite.S7ParameterUserDataItem;
 import org.apache.plc4x.java.s7.readwrite.S7ParameterUserDataItemCPUFunctions;
 import org.apache.plc4x.java.s7.readwrite.S7ParameterWriteVarRequest;
-import org.apache.plc4x.java.s7.readwrite.S7ParameterWriteVarResponse;
 import org.apache.plc4x.java.s7.readwrite.S7PayloadReadVarResponse;
 import org.apache.plc4x.java.s7.readwrite.S7PayloadUserData;
 import org.apache.plc4x.java.s7.readwrite.S7PayloadUserDataItem;
@@ -96,7 +95,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
@@ -110,7 +108,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
     public static final Duration REQUEST_TIMEOUT = Duration.ofMillis(10000);
 
     private S7DriverContext s7DriverContext;
-    private static final AtomicInteger tpduGenerator = new AtomicInteger(10);
+    private final AtomicInteger tpduGenerator = new AtomicInteger(10);
     private RequestTransactionManager tm;
 
     @Override
@@ -257,7 +255,6 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
             .check(p -> p.getPayload() instanceof S7MessageResponseData)
             .unwrap(p -> (S7MessageResponseData) p.getPayload())
             .check(p -> p.getTpduReference() == tpduId)
-            .check(p -> p.getParameter() instanceof S7ParameterReadVarResponse)
             .handle(p -> {
                 future.complete(p);
                 // Finish the request-transaction.
@@ -296,7 +293,6 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
             .check(p -> p.getPayload() instanceof S7MessageResponseData)
             .unwrap(p -> ((S7MessageResponseData) p.getPayload()))
             .check(p -> p.getTpduReference() == tpduId)
-            .check(p -> p.getParameter() instanceof S7ParameterWriteVarResponse)
             .handle(p -> {
                 try {
                     future.complete(((PlcWriteResponse) decodeWriteResponse(p, ((InternalPlcWriteRequest) writeRequest))));
@@ -383,6 +379,33 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
     }
 
     private PlcResponse decodeReadResponse(S7MessageResponseData responseMessage, InternalPlcReadRequest plcReadRequest) throws PlcProtocolException {
+        Map<String, ResponseItem<PlcValue>> values = new HashMap<>();
+        // If the result contains any form of non-null error code, handle this instead.
+        if((responseMessage.getErrorClass() != 0) || (responseMessage.getErrorCode() != 0)) {
+            // This is usually the case if PUT/GET wasn't enabled on the PLC
+            if((responseMessage.getErrorClass() == 129) && (responseMessage.getErrorCode() == 4)) {
+                LOGGER.warn("Got an error response from the PLC. This particular response code usually indicates " +
+                    "that PUT/GET is not enabled on the PLC.");
+                for (String fieldName : plcReadRequest.getFieldNames()) {
+                    ResponseItem<PlcValue> result = new ResponseItem<>(PlcResponseCode.ACCESS_DENIED, new PlcNull());
+                    values.put(fieldName, result);
+                }
+                return new DefaultPlcReadResponse(plcReadRequest, values);
+            } else {
+                LOGGER.warn("Got an unknown error response from the PLC. Error Class: {}, Error Code {}. " +
+                    "We probably need to implement explicit handling for this, so please file a bug-report " +
+                    "on https://issues.apache.org/jira/projects/PLC4X and ideally attach a WireShark dump " +
+                    "containing a capture of the communication.",
+                    responseMessage.getErrorClass(), responseMessage.getErrorCode());
+                for (String fieldName : plcReadRequest.getFieldNames()) {
+                    ResponseItem<PlcValue> result = new ResponseItem<>(PlcResponseCode.INTERNAL_ERROR, new PlcNull());
+                    values.put(fieldName, result);
+                }
+                return new DefaultPlcReadResponse(plcReadRequest, values);
+            }
+        }
+
+        // In all other cases all went well.
         S7PayloadReadVarResponse payload = (S7PayloadReadVarResponse) responseMessage.getPayload();
 
         // If the numbers of items don't match, we're in big trouble as the only
@@ -393,7 +416,6 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
                 "The number of requested items doesn't match the number of returned items");
         }
 
-        Map<String, ResponseItem<PlcValue>> values = new HashMap<>();
         S7VarPayloadDataItem[] payloadItems = payload.getItems();
         int index = 0;
         for (String fieldName : plcReadRequest.getFieldNames()) {
@@ -415,6 +437,31 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
     }
 
     private PlcResponse decodeWriteResponse(S7MessageResponseData responseMessage, InternalPlcWriteRequest plcWriteRequest) throws PlcProtocolException {
+        Map<String, PlcResponseCode> responses = new HashMap<>();
+        // If the result contains any form of non-null error code, handle this instead.
+        if((responseMessage.getErrorClass() != 0) || (responseMessage.getErrorCode() != 0)) {
+            // This is usually the case if PUT/GET wasn't enabled on the PLC
+            if((responseMessage.getErrorClass() == 129) && (responseMessage.getErrorCode() == 4)) {
+                LOGGER.warn("Got an error response from the PLC. This particular response code usually indicates " +
+                    "that PUT/GET is not enabled on the PLC.");
+                for (String fieldName : plcWriteRequest.getFieldNames()) {
+                    responses.put(fieldName, PlcResponseCode.ACCESS_DENIED);
+                }
+                return new DefaultPlcWriteResponse(plcWriteRequest, responses);
+            } else {
+                LOGGER.warn("Got an unknown error response from the PLC. Error Class: {}, Error Code {}. " +
+                        "We probably need to implement explicit handling for this, so please file a bug-report " +
+                        "on https://issues.apache.org/jira/projects/PLC4X and ideally attach a WireShark dump " +
+                        "containing a capture of the communication.",
+                    responseMessage.getErrorClass(), responseMessage.getErrorCode());
+                for (String fieldName : plcWriteRequest.getFieldNames()) {
+                    responses.put(fieldName, PlcResponseCode.INTERNAL_ERROR);
+                }
+                return new DefaultPlcWriteResponse(plcWriteRequest, responses);
+            }
+        }
+
+        // In all other cases all went well.
         S7PayloadWriteVarResponse payload = (S7PayloadWriteVarResponse) responseMessage.getPayload();
 
         // If the numbers of items don't match, we're in big trouble as the only
@@ -425,7 +472,6 @@ public class S7ProtocolLogic extends Plc4xProtocolBase<TPKTPacket> {
                 "The number of requested items doesn't match the number of returned items");
         }
 
-        Map<String, PlcResponseCode> responses = new HashMap<>();
         S7VarPayloadStatusItem[] payloadItems = payload.getItems();
         int index = 0;
         for (String fieldName : plcWriteRequest.getFieldNames()) {
diff --git a/plc4j/drivers/s7/src/test/resources/testsuite/S7DriverIT.xml b/plc4j/drivers/s7/src/test/resources/testsuite/S7DriverIT.xml
index c5ed2e1..4f6311d 100644
--- a/plc4j/drivers/s7/src/test/resources/testsuite/S7DriverIT.xml
+++ b/plc4j/drivers/s7/src/test/resources/testsuite/S7DriverIT.xml
@@ -305,4 +305,79 @@
     </steps>
   </testcase>
 
+  <testcase>
+    <name>Single element read request with disabled PUT/GET</name>
+    <steps>
+      <api-request name="Receive Read Request from application">
+        <TestReadRequest className="org.apache.plc4x.test.driver.model.api.TestReadRequest">
+          <fields>
+            <field className="org.apache.plc4x.test.driver.model.api.TestField">
+              <name>hurz</name>
+              <address>%Q0.0:BOOL</address>
+            </field>
+          </fields>
+        </TestReadRequest>
+      </api-request>
+      <outgoing-plc-message name="Send S7 Read Request">
+        <TPKTPacket className="org.apache.plc4x.java.s7.readwrite.TPKTPacket">
+          <payload className="org.apache.plc4x.java.s7.readwrite.COTPPacketData">
+            <parameters/>
+            <payload className="org.apache.plc4x.java.s7.readwrite.S7MessageRequest">
+              <tpduReference>10</tpduReference>
+              <parameter className="org.apache.plc4x.java.s7.readwrite.S7ParameterReadVarRequest">
+                <items>
+                  <items className="org.apache.plc4x.java.s7.readwrite.S7VarRequestParameterItemAddress">
+                    <address className="org.apache.plc4x.java.s7.readwrite.S7AddressAny">
+                      <transportSize>BOOL</transportSize>
+                      <numberOfElements>1</numberOfElements>
+                      <dbNumber>0</dbNumber>
+                      <area>OUTPUTS</area>
+                      <byteAddress>0</byteAddress>
+                      <bitAddress>0</bitAddress>
+                    </address>
+                  </items>
+                </items>
+              </parameter>
+              <payload/>
+            </payload>
+            <eot>true</eot>
+            <tpduRef>10</tpduRef>
+          </payload>
+        </TPKTPacket>
+      </outgoing-plc-message>
+      <incoming-plc-message name="Receive S7 Read Response">
+        <TPKTPacket className="org.apache.plc4x.java.s7.readwrite.TPKTPacket">
+          <payload className="org.apache.plc4x.java.s7.readwrite.COTPPacketData">
+            <parameters/>
+            <payload className="org.apache.plc4x.java.s7.readwrite.S7MessageResponseData">
+              <tpduReference>10</tpduReference>
+              <errorClass>129</errorClass>
+              <errorCode>4</errorCode>
+            </payload>
+            <eot>true</eot>
+            <tpduRef>0</tpduRef>
+          </payload>
+        </TPKTPacket>
+      </incoming-plc-message>
+      <api-response name="Report Read Response to application">
+        <DefaultPlcReadResponse className="org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse">
+          <request className="org.apache.plc4x.java.spi.messages.DefaultPlcReadRequest">
+            <hurz className="org.apache.plc4x.java.s7.readwrite.field.S7Field">
+              <dataType>BOOL</dataType>
+              <memoryArea>OUTPUTS</memoryArea>
+              <blockNumber>0</blockNumber>
+              <byteOffset>0</byteOffset>
+              <bitOffset>0</bitOffset>
+              <numElements>1</numElements>
+            </hurz>
+          </request>
+          <hurz>
+            <code>ACCESS_DENIED</code>
+            <value/>
+          </hurz>
+        </DefaultPlcReadResponse>
+      </api-response>
+    </steps>
+  </testcase>
+
 </test:driver-testsuite>
\ No newline at end of file
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java
index 607dc88..9f922fd 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java
+++ b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/DriverTestsuiteRunner.java
@@ -145,11 +145,9 @@ public class DriverTestsuiteRunner {
             // Force the driver to not wait for the connection before returning the connection.
             System.setProperty(GeneratedDriverBase.PROPERTY_PLC4X_FORCE_AWAIT_SETUP_COMPLETE, "false");
 
-            PlcConnection connection = getConnection(driverName, driverParameters);
-
             TimeUnit.MILLISECONDS.sleep(200);
 
-            return new DriverTestsuite(testsuiteName, connection, setupSteps, teardownSteps, testcases, bigEndian);
+            return new DriverTestsuite(testsuiteName, driverName, driverParameters, setupSteps, teardownSteps, testcases, bigEndian);
         } catch (DocumentException e) {
             throw new DriverTestsuiteException("Error parsing testsuite xml", e);
         } catch (InterruptedException e) {
@@ -177,12 +175,12 @@ public class DriverTestsuiteRunner {
     }
 
     private void run(DriverTestsuite testsuite, Testcase testcase) throws DriverTestsuiteException {
-        final PlcConnection plcConnection = testsuite.getConnection();
-        final Plc4xEmbeddedChannel embeddedChannel = getEmbeddedChannel(testsuite);
+        final PlcConnection plcConnection = getConnection(testsuite.getDriverName(), testsuite.getDriverParameters());
+        final Plc4xEmbeddedChannel embeddedChannel = getEmbeddedChannel(plcConnection);
         final boolean bigEndian = testsuite.isBigEndian();
         // Be sure this is reset, just in case a previous testcase failed.
         responseFuture = null;
-        if(!testsuite.getSetupSteps().isEmpty()) {
+        if (!testsuite.getSetupSteps().isEmpty()) {
             LOGGER.info("Running setup steps");
             for (TestStep setupStep : testsuite.getSetupSteps()) {
                 executeStep(setupStep, plcConnection, embeddedChannel, bigEndian);
@@ -265,15 +263,18 @@ public class DriverTestsuiteRunner {
                     if(responseFuture == null) {
                         throw new DriverTestsuiteException("No response expected.");
                     }
+                    PlcResponse plcResponse = null;
                     try {
-                        final PlcResponse plcResponse = responseFuture.get(5000, TimeUnit.MILLISECONDS);
-                        // Reset the future.
-                        responseFuture = null;
-                        final String serializedResponse = mapper.writeValueAsString(plcResponse);
-                        validateApiMessage(payload, serializedResponse);
+                        plcResponse = responseFuture.get(5000, TimeUnit.MILLISECONDS);
                     } catch(Exception e) {
-                        throw new DriverTestsuiteException("Got no response within 1000ms.");
+                        throw new DriverTestsuiteException("Got no response within 5000ms.");
                     }
+
+                    // Reset the future.
+                    responseFuture = null;
+                    final String serializedResponse = mapper.writeValueAsString(plcResponse);
+                    validateApiMessage(payload, serializedResponse);
+
                     break;
                 }
                 case DELAY: {
@@ -301,9 +302,9 @@ public class DriverTestsuiteRunner {
         return new TestStep(stepType, stepName, definition);
     }
 
-    private Plc4xEmbeddedChannel getEmbeddedChannel(DriverTestsuite testSuite) {
-        if(testSuite.getConnection() instanceof ChannelExposingConnection) {
-            ChannelExposingConnection connection = (ChannelExposingConnection) testSuite.getConnection();
+    private Plc4xEmbeddedChannel getEmbeddedChannel(PlcConnection plcConnection) {
+        if(plcConnection instanceof ChannelExposingConnection) {
+            ChannelExposingConnection connection = (ChannelExposingConnection) plcConnection;
             Channel channel = connection.getChannel();
             if(channel instanceof Plc4xEmbeddedChannel) {
                 return (Plc4xEmbeddedChannel) channel;
@@ -343,7 +344,7 @@ public class DriverTestsuiteRunner {
 
     private byte[] getOutboundBytes(Plc4xEmbeddedChannel embeddedChannel) throws DriverTestsuiteException {
         ByteBuf byteBuf = null;
-        for(int i = 0; i < 100; i++) {
+        for(int i = 0; i < 500; i++) {
             byteBuf = embeddedChannel.readOutbound();
             if(byteBuf != null) {
                 break;
@@ -351,7 +352,7 @@ public class DriverTestsuiteRunner {
             delay(10);
         }
         if(byteBuf == null) {
-            throw new DriverTestsuiteException("No outbound message available within 1000ms");
+            throw new DriverTestsuiteException("No outbound message available within 5000ms");
         }
         final byte[] data = new byte[byteBuf.readableBytes()];
         byteBuf.readBytes(data);
diff --git a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java
index 9c8d4a1..586061e 100644
--- a/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java
+++ b/plc4j/utils/test-utils/src/main/java/org/apache/plc4x/test/driver/model/DriverTestsuite.java
@@ -22,20 +22,24 @@ import org.apache.plc4x.java.api.PlcConnection;
 
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.Map;
 
 public class DriverTestsuite {
 
     private final String name;
-    private final PlcConnection connection;
+    private final String driverName;
+    private final Map<String, String> driverParameters;
     private final List<TestStep> setupSteps;
     private final List<TestStep> teardownSteps;
     private final List<Testcase> testcases;
     private final boolean bigEndian;
 
-    public DriverTestsuite(String name, PlcConnection connection, List<TestStep> setupSteps,
-                           List<TestStep> teardownSteps, List<Testcase> testcases, boolean bigEndian) {
+    public DriverTestsuite(String name, String driverName, Map<String, String> driverParameters,
+                           List<TestStep> setupSteps, List<TestStep> teardownSteps,
+                           List<Testcase> testcases, boolean bigEndian) {
         this.name = name;
-        this.connection = connection;
+        this.driverName = driverName;
+        this.driverParameters = driverParameters;
         this.setupSteps = setupSteps;
         this.teardownSteps = teardownSteps;
         this.testcases = testcases;
@@ -46,8 +50,12 @@ public class DriverTestsuite {
         return name;
     }
 
-    public PlcConnection getConnection() {
-        return connection;
+    public String getDriverName() {
+        return driverName;
+    }
+
+    public Map<String, String>  getDriverParameters() {
+        return driverParameters;
     }
 
     public List<TestStep> getSetupSteps() {


[plc4x] 01/02: PLC4X-245 - [Modbus] Apache NiFi processor throws java.io.IOException after a while

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 72cb97f2e257ac4ef65f0ea6ab2511c974f30d0a
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Aug 31 14:10:11 2020 +0200

    PLC4X-245 - [Modbus] Apache NiFi processor throws java.io.IOException after a while
    
    Made the NiFi integration utilize the PLC connection pool.
---
 .../apache-nifi/nifi-plc4x-processors/pom.xml      |  6 +++
 .../org/apache/plc4x/nifi/BasePlc4xProcessor.java  | 23 +++-----
 .../org/apache/plc4x/nifi/Plc4xSinkProcessor.java  | 50 +++++++++--------
 .../apache/plc4x/nifi/Plc4xSourceProcessor.java    | 63 ++++++++++++----------
 4 files changed, 74 insertions(+), 68 deletions(-)

diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
index db8da39..66b49b1 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
@@ -33,6 +33,12 @@
       <version>0.8.0-SNAPSHOT</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-connection-pool</artifactId>
+      <version>0.8.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.nifi</groupId>
       <artifactId>nifi-api</artifactId>
     </dependency>
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index e4c6463..d01f217 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -24,9 +24,6 @@ import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 
 import java.util.*;
@@ -62,8 +59,7 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
 
     Set<Relationship> relationships;
 
-    private PlcConnection connection;
-
+    private String connectionString;
     private Map<String, String> addressMap;
 
     @Override
@@ -72,8 +68,8 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
         this.relationships = new HashSet<>(Arrays.asList(SUCCESS, FAILURE));
     }
 
-    PlcConnection getConnection() {
-        return connection;
+    public String getConnectionString() {
+        return connectionString;
     }
 
     Collection<String> getFields() {
@@ -96,14 +92,7 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         PropertyValue property = context.getProperty(PLC_CONNECTION_STRING.getName());
-        if ((connection == null) || !connection.isConnected()) {
-            try {
-                connection = new PlcDriverManager().getConnection(property.getValue());
-            } catch (PlcConnectionException e) {
-                getLogger().error("Error connecting to " + property.getValue(), e);
-            }
-        }
-
+        connectionString = property.getValue();
         addressMap = new HashMap<>();
         PropertyValue addresses = context.getProperty(PLC_ADDRESS_STRING.getName());
         for (String segment : addresses.getValue().split(";")) {
@@ -129,13 +118,13 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
         BasePlc4xProcessor that = (BasePlc4xProcessor) o;
         return Objects.equals(descriptors, that.descriptors) &&
             Objects.equals(getRelationships(), that.getRelationships()) &&
-            Objects.equals(getConnection(), that.getConnection()) &&
+            Objects.equals(getConnectionString(), that.getConnectionString()) &&
             Objects.equals(addressMap, that.addressMap);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), descriptors, getRelationships(), getConnection(), addressMap);
+        return Objects.hash(super.hashCode(), descriptors, getRelationships(), getConnectionString(), addressMap);
     }
 
     public static class Plc4xConnectionStringValidator implements Validator {
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
index b0e2129..a1c1b2e 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
@@ -31,8 +31,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
-
-import java.util.concurrent.CompletableFuture;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 
 @TriggerSerially
 @Tags({"plc4x-sink"})
@@ -51,30 +50,35 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
         }
 
         // Get an instance of a component able to write to a PLC.
-        PlcConnection connection = getConnection();
-        if (!connection.getMetadata().canWrite()) {
-            throw new ProcessException("Writing not supported by connection");
-        }
-
-        // Prepare the request.
-        PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
-        flowFile.getAttributes().forEach((field, value) -> {
-            String address = getAddress(field);
-            if(address != null) {
-                // TODO: Convert the String into the right type ...
-                builder.addItem(field, address, Boolean.valueOf(value));
+        try(PlcConnection connection = new PooledPlcDriverManager().getConnection(getConnectionString())) {
+            if (!connection.getMetadata().canWrite()) {
+                throw new ProcessException("Writing not supported by connection");
             }
-        });
-        PlcWriteRequest writeRequest = builder.build();
 
-        // Send the request to the PLC.
-        try {
-            final PlcWriteResponse plcWriteResponse = writeRequest.execute().get();
-            // TODO: Evaluate the response and create flow files for successful and unsuccessful updates
-            session.transfer(flowFile, SUCCESS);
+            // Prepare the request.
+            PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
+            flowFile.getAttributes().forEach((field, value) -> {
+                String address = getAddress(field);
+                if (address != null) {
+                    // TODO: Convert the String into the right type ...
+                    builder.addItem(field, address, Boolean.valueOf(value));
+                }
+            });
+            PlcWriteRequest writeRequest = builder.build();
+
+            // Send the request to the PLC.
+            try {
+                final PlcWriteResponse plcWriteResponse = writeRequest.execute().get();
+                // TODO: Evaluate the response and create flow files for successful and unsuccessful updates
+                session.transfer(flowFile, SUCCESS);
+            } catch (Exception e) {
+                flowFile = session.putAttribute(flowFile, "exception", e.getLocalizedMessage());
+                session.transfer(flowFile, FAILURE);
+            }
+        } catch (ProcessException e) {
+            throw e;
         } catch (Exception e) {
-            flowFile = session.putAttribute(flowFile, "exception", e.getLocalizedMessage());
-            session.transfer(flowFile, FAILURE);
+            throw new ProcessException("Got an error while trying to get a connection");
         }
     }
 
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index d043baf..a19c4bb 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -28,8 +28,10 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -44,39 +46,44 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor {
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         // Get an instance of a component able to read from a PLC.
-        PlcConnection connection = getConnection();
+        try(PlcConnection connection = new PooledPlcDriverManager().getConnection(getConnectionString())) {
 
-        // Prepare the request.
-        if (!connection.getMetadata().canRead()) {
-            throw new ProcessException("Writing not supported by connection");
-        }
+            // Prepare the request.
+            if (!connection.getMetadata().canRead()) {
+                throw new ProcessException("Writing not supported by connection");
+            }
 
-        FlowFile flowFile = session.create();
-        try {
-            PlcReadRequest.Builder builder = connection.readRequestBuilder();
-            getFields().forEach(field -> {
-                String address = getAddress(field);
-                if(address != null) {
-                    builder.addItem(field, address);
-                }
-            });
-            PlcReadRequest readRequest = builder.build();
-            PlcReadResponse response = readRequest.execute().get();
-            Map<String, String> attributes = new HashMap<>();
-            for (String fieldName : response.getFieldNames()) {
-                for(int i = 0; i < response.getNumberOfValues(fieldName); i++) {
-                    Object value = response.getObject(fieldName, i);
-                    attributes.put(fieldName, String.valueOf(value));
+            FlowFile flowFile = session.create();
+            try {
+                PlcReadRequest.Builder builder = connection.readRequestBuilder();
+                getFields().forEach(field -> {
+                    String address = getAddress(field);
+                    if (address != null) {
+                        builder.addItem(field, address);
+                    }
+                });
+                PlcReadRequest readRequest = builder.build();
+                PlcReadResponse response = readRequest.execute().get();
+                Map<String, String> attributes = new HashMap<>();
+                for (String fieldName : response.getFieldNames()) {
+                    for (int i = 0; i < response.getNumberOfValues(fieldName); i++) {
+                        Object value = response.getObject(fieldName, i);
+                        attributes.put(fieldName, String.valueOf(value));
+                    }
                 }
+                flowFile = session.putAllAttributes(flowFile, attributes);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new ProcessException(e);
+            } catch (ExecutionException e) {
+                throw new ProcessException(e);
             }
-            flowFile = session.putAllAttributes(flowFile, attributes);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new ProcessException(e);
-        } catch (ExecutionException e) {
-            throw new ProcessException(e);
+            session.transfer(flowFile, SUCCESS);
+        } catch (ProcessException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new ProcessException("Got an error while trying to get a connection");
         }
-        session.transfer(flowFile, SUCCESS);
     }
 
 }