You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/11/24 21:52:46 UTC

[incubator-plc4x] branch develop updated (5820835 -> 8587eb6)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


    from 5820835  [plc4j-scraper] Minor fixes. Added commons-pool2 versioning.
     new 6edb3f8  - Fixed some typos in the script
     new 8587eb6  PLC4X-58 - [S7] Reading more then PDU with one request blocks calling thread indefinitely

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../plc4x/java/s7/connection/S7PlcConnection.java  |   3 +-
 .../org/apache/plc4x/java/s7/netty/S7Protocol.java |   3 +-
 .../netty/model/types/DataTransportErrorCode.java  |   4 +-
 .../strategies/DefaultS7MessageProcessor.java      | 209 ++++++++++++--------
 .../s7/netty/util/S7RequestSizeCalculator.java     |   1 +
 .../strategies/DefaultS7MessageProcessorTest.java  | 211 +++++++++++++++++++--
 tools/common.sh                                    |  14 +-
 7 files changed, 343 insertions(+), 102 deletions(-)


[incubator-plc4x] 02/02: PLC4X-58 - [S7] Reading more then PDU with one request blocks calling thread indefinitely

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 8587eb6910131bd3a419b9e69db4eb27e7f37a42
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Sat Nov 24 22:52:41 2018 +0100

    PLC4X-58 - [S7] Reading more then PDU with one request blocks calling thread indefinitely
    
    - Now splitting and merging of large items should be supported.
---
 .../plc4x/java/s7/connection/S7PlcConnection.java  |   3 +-
 .../org/apache/plc4x/java/s7/netty/S7Protocol.java |   3 +-
 .../netty/model/types/DataTransportErrorCode.java  |   4 +-
 .../strategies/DefaultS7MessageProcessor.java      | 209 ++++++++++++--------
 .../s7/netty/util/S7RequestSizeCalculator.java     |   1 +
 .../strategies/DefaultS7MessageProcessorTest.java  | 211 +++++++++++++++++++--
 6 files changed, 336 insertions(+), 95 deletions(-)

diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
index 63609d5..0dbc2a3 100644
--- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
+++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
@@ -18,6 +18,7 @@
  */
 package org.apache.plc4x.java.s7.connection;
 
+import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.SystemConfiguration;
@@ -225,7 +226,7 @@ public class S7PlcConnection extends NettyPlcConnection implements PlcReader, Pl
             // Send the PLC a message that the connection is being closed.
             DisconnectRequestTpdu disconnectRequest = new DisconnectRequestTpdu(
                 (short) 0x0000, (short) 0x000F, DisconnectReason.NORMAL, Collections.emptyList(),
-                null);
+                Unpooled.EMPTY_BUFFER);
 
             // In case of an ISO TP Class 0 connection, the remote is usually expected to actively
             // close the connection. So we add a listener waiting for this to happen.
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
index 59480ac..4b17779 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
@@ -681,7 +681,8 @@ public class S7Protocol extends ChannelDuplexHandler {
                 if (!isResponse) {
                     varParamameter = decodeReadWriteVarParameter(in, numItems);
                 } else {
-                    varParamameter = Collections.emptyList();
+                    varParamameter = Collections.singletonList(
+                        new S7AnyVarParameterItem(null, null, null, numItems, (short) 0, (short) 0, (byte) 0));
                 }
                 return new VarParameter(parameterType, varParamameter);
             case SETUP_COMMUNICATION:
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/DataTransportErrorCode.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/DataTransportErrorCode.java
index 4074331..bbbd3f5 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/DataTransportErrorCode.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/DataTransportErrorCode.java
@@ -26,11 +26,11 @@ import java.util.Map;
 
 public enum DataTransportErrorCode {
     RESERVED((byte) 0x00),
+    OK((byte) 0xFF),
     ACCESS_DENIED((byte) 0x03),
     INVALID_ADDRESS((byte) 0x05),
     DATA_TYPE_NOT_SUPPORTED((byte) 0x06),
-    NOT_FOUND((byte) 0x0A),
-    OK((byte) 0xFF);
+    NOT_FOUND((byte) 0x0A);
 
     private static final Logger logger = LoggerFactory.getLogger(DataTransportErrorCode.class);
 
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java
index 7d813a4..fa017ec 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java
@@ -21,6 +21,7 @@ package org.apache.plc4x.java.s7.netty.strategies;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.base.messages.PlcProtocolMessage;
 import org.apache.plc4x.java.s7.netty.model.messages.S7RequestMessage;
 import org.apache.plc4x.java.s7.netty.model.messages.S7ResponseMessage;
@@ -31,6 +32,7 @@ import org.apache.plc4x.java.s7.netty.model.params.items.VarParameterItem;
 import org.apache.plc4x.java.s7.netty.model.payloads.S7Payload;
 import org.apache.plc4x.java.s7.netty.model.payloads.VarPayload;
 import org.apache.plc4x.java.s7.netty.model.payloads.items.VarPayloadItem;
+import org.apache.plc4x.java.s7.netty.model.types.DataTransportErrorCode;
 import org.apache.plc4x.java.s7.netty.model.types.MessageType;
 import org.apache.plc4x.java.s7.netty.model.types.ParameterType;
 import org.apache.plc4x.java.s7.netty.model.types.TransportSize;
@@ -116,26 +118,60 @@ public class DefaultS7MessageProcessor implements S7MessageProcessor {
         compositeRequestMessage.addRequestMessage(subMessage);
 
         // This calculates the size of the header for the request and response.
-        int curRequestSize = S7RequestSizeCalculator.getRequestMessageSize(subMessage);
-        int curResponseSize = S7ResponseSizeEstimator.getEstimatedResponseMessageSize(subMessage);
+        int initialRequestSize = S7RequestSizeCalculator.getRequestMessageSize(subMessage);
+        int curRequestSize = initialRequestSize;
+        int initialResponseSize = S7ResponseSizeEstimator.getEstimatedResponseMessageSize(subMessage);
+        int curResponseSize = initialResponseSize;
 
-        // For each var item of the original request, try adding them to the current sub-message
-        // as long as it or the resulting response does not exceed the max PDU size.
+        VarParameter preProcessedVarParameter = new VarParameter(varParameter.getType(), new LinkedList<>());
         for (VarParameterItem varParameterItem : varParameter.getItems()) {
-            VarPayloadItem varPayloadItem = null;
-            Optional<VarPayloadItem> payloadItem = request.getPayload(VarPayloadItem.class);
-            if (payloadItem.isPresent()) {
-                varPayloadItem = payloadItem.get();
+            // Use the S7RequestSizeCalculator to calculate the actual and estimated item sizes.
+            int itemRequestSize = S7RequestSizeCalculator.getRequestItemTotalSize(
+                varParameterItem, null);
+            int itemResponseSize = S7ResponseSizeEstimator.getEstimatedResponseReadItemTotalSize(
+                varParameterItem, null);
+
+            // If the item would not fit into a separate message, we have to split it.
+            if((initialRequestSize + itemRequestSize > pduSize) || (initialResponseSize + itemResponseSize > pduSize)) {
+                // The max response size is the size of the empty response, plus the type and num-items (each one byte) of one VarParameter, plus the size of the header one VarPayloadItem
+                int maxResponseSize = pduSize - (initialResponseSize + 2 + 4);
+
+                S7AnyVarParameterItem s7AnyVarParameterItem = ((S7AnyVarParameterItem) varParameterItem);
+                int maxNumElements = (int) Math.floor(
+                    (double) maxResponseSize / (double) s7AnyVarParameterItem.getDataType().getSizeInBytes());
+                int sizeMaxNumElementInBytes = maxNumElements * s7AnyVarParameterItem.getDataType().getSizeInBytes();
+                int remainingNumElements = s7AnyVarParameterItem.getNumElements();
+                short curByteOffset = s7AnyVarParameterItem.getByteOffset();
+
+                while(remainingNumElements > 0) {
+                    int numCurElements = Math.min(remainingNumElements, maxNumElements);
+                    VarParameterItem subVarParameterItem = new S7AnyVarParameterItem(
+                        s7AnyVarParameterItem.getSpecificationType(), s7AnyVarParameterItem.getMemoryArea(),
+                        s7AnyVarParameterItem.getDataType(), numCurElements, s7AnyVarParameterItem.getDataBlockNumber(),
+                        curByteOffset, (byte) 0);
+                    preProcessedVarParameter.getItems().add(subVarParameterItem);
+
+                    remainingNumElements -= maxNumElements;
+                    curByteOffset += sizeMaxNumElementInBytes;
+                }
+            }
+            // In all other cases, just forward the item.
+            else {
+                preProcessedVarParameter.getItems().add(varParameterItem);
             }
+        }
 
+        // For each var item of the original request, try adding them to the current sub-message
+        // as long as it or the resulting response does not exceed the max PDU size.
+        for (VarParameterItem varParameterItem : preProcessedVarParameter.getItems()) {
             // Use the S7RequestSizeCalculator to calculate the actual and estimated item sizes.
             int itemRequestSize = S7RequestSizeCalculator.getRequestItemTotalSize(
-                varParameterItem, varPayloadItem);
+                varParameterItem, null);
             int itemResponseSize = S7ResponseSizeEstimator.getEstimatedResponseReadItemTotalSize(
-                varParameterItem, varPayloadItem);
+                varParameterItem, null);
 
-            // When adding this item to the request we would exceed the pdu size in
-            // the request or response, so we have to create a new sub-message.
+            // If adding this item, would exceed either the request or response size,
+            // create a new sub-message and add this item to that.
             if ((curRequestSize + itemRequestSize > pduSize) || (curResponseSize + itemResponseSize > pduSize)) {
                 // Create a new var parameter without any items (yet).
                 subVarParameter = new VarParameter(varParameter.getType(), new LinkedList<>());
@@ -147,8 +183,8 @@ public class DefaultS7MessageProcessor implements S7MessageProcessor {
                     Collections.emptyList(), compositeRequestMessage);
 
                 // Reset the message size
-                curRequestSize = S7RequestSizeCalculator.getRequestMessageSize(subMessage);
-                curResponseSize = S7ResponseSizeEstimator.getEstimatedResponseMessageSize(subMessage);
+                curRequestSize = S7RequestSizeCalculator.getRequestMessageSize(subMessage) + itemRequestSize;
+                curResponseSize = S7ResponseSizeEstimator.getEstimatedResponseMessageSize(subMessage) + itemResponseSize;
 
                 // Add this new sub-message to the composite.
                 compositeRequestMessage.addRequestMessage(subMessage);
@@ -158,7 +194,6 @@ public class DefaultS7MessageProcessor implements S7MessageProcessor {
                 curResponseSize += itemResponseSize;
             }
 
-            // Add the item to the current subVarParameter.
             subVarParameter.getItems().add(varParameterItem);
         }
         return compositeRequestMessage;
@@ -177,9 +212,10 @@ public class DefaultS7MessageProcessor implements S7MessageProcessor {
         List<VarParameterItem> parameterItems = varParameter.getItems();
         List<VarPayloadItem> payloadItems = varPayload.getItems();
 
-        for (int i1 = 0; i1 < parameterItems.size(); i1++) {
-            VarParameterItem varParameterItem = parameterItems.get(i1);
-            VarPayloadItem varPayloadItem = payloadItems.get(i1);
+        for (int i = 0; i < parameterItems.size(); i++) {
+            VarParameterItem varParameterItem = parameterItems.get(i);
+            VarPayloadItem varPayloadItem = payloadItems.get(i);
+
             if (varParameterItem instanceof S7AnyVarParameterItem) {
                 S7AnyVarParameterItem s7AnyVarParameterItem = (S7AnyVarParameterItem) varParameterItem;
                 short byteOffset = s7AnyVarParameterItem.getByteOffset();
@@ -314,81 +350,96 @@ public class DefaultS7MessageProcessor implements S7MessageProcessor {
 
     private S7ResponseMessage getMergedResponseMessage(S7RequestMessage requestMessage,
                                                        Collection<? extends S7ResponseMessage> responses) {
-
-        S7ResponseMessage firstResponse = null;
+        MessageType messageType = null;
         short tpduReference = requestMessage.getTpduReference();
         List<S7Parameter> s7Parameters = new LinkedList<>();
         List<S7Payload> s7Payloads = new LinkedList<>();
-        byte errorClass = 0;
-        byte errorCode = 0;
-        VarParameter readVarParameter = null;
-        VarParameter writeVarParameter = null;
-        VarPayload readVarPayload = null;
-        VarPayload writeVarPayload = null;
-
-        // TODO: We should change this code to not use the lists of the first parameter or payload as this can cause problems when using mutable lists.
-        for (S7ResponseMessage response : responses) {
-            if(firstResponse == null) {
-                firstResponse = response;
+
+        Optional<VarParameter> varParameterOptional = requestMessage.getParameter(VarParameter.class);
+
+        // This is neither a read request nor a write request, just merge all parameters together.
+        if(!varParameterOptional.isPresent()) {
+            for (S7ResponseMessage response : responses) {
+                messageType = response.getMessageType();
+                s7Parameters.addAll(response.getParameters());
+                s7Payloads.addAll(response.getPayloads());
             }
-            // Some parameters have to be merged. In case of read and write parameters
-            // their items have to be merged into one single parameter.
-            for(S7Parameter parameter : response.getParameters()) {
-                if (parameter.getType() == ParameterType.READ_VAR) {
-                    if (readVarParameter == null) {
-                        readVarParameter = (VarParameter) parameter;
-                        s7Parameters.add(parameter);
-                    } else {
-                        readVarParameter.mergeParameter((VarParameter) parameter);
-                    }
-                } else if (parameter.getType() == ParameterType.WRITE_VAR) {
-                    if (writeVarParameter == null) {
-                        writeVarParameter = (VarParameter) parameter;
-                        s7Parameters.add(parameter);
-                    } else {
-                        writeVarParameter.mergeParameter((VarParameter) parameter);
-                    }
-                } else {
-                    s7Parameters.add(parameter);
-                }
+        }
+
+        // This is a read or write request, we have to merge all the items in the var parameter.
+        else {
+            List<VarParameterItem> parameterItems = new LinkedList<>();
+            List<VarPayloadItem> payloadItems = new LinkedList<>();
+            for (S7ResponseMessage response : responses) {
+                messageType = response.getMessageType();
+                parameterItems.addAll(response.getParameter(VarParameter.class)
+                    .orElseThrow(() -> new PlcRuntimeException(
+                        "Every response of a Read message should have a VarParameter instance")).getItems());
+                Optional<VarPayload> payload = response.getPayload(VarPayload.class);
+                payload.ifPresent(varPayload -> payloadItems.addAll(varPayload.getItems()));
             }
 
-            // Some payloads have to be merged. In case of read and write payloads
-            // their items have to be merged into one single payload.
-            for(S7Payload payload : response.getPayloads()) {
-                if(payload.getType() == ParameterType.READ_VAR) {
-                    if (readVarPayload == null) {
-                        readVarPayload = (VarPayload) payload;
-                    } else {
-                        s7Payloads.remove(readVarPayload);
-                        readVarPayload = readVarPayload.mergePayload((VarPayload) payload);
-                    }
-                    s7Payloads.add(readVarPayload);
-                } else if(payload.getType() == ParameterType.WRITE_VAR) {
-                    if(writeVarPayload == null) {
-                        writeVarPayload = (VarPayload) payload;
-                    } else {
-                        s7Payloads.remove(writeVarPayload);
-                        writeVarPayload = writeVarPayload.mergePayload((VarPayload) payload);
+            List<VarParameterItem> mergedParameterItems = new LinkedList<>();
+            List<VarPayloadItem> mergedPayloadItems = new LinkedList<>();
+            VarParameter varParameter = varParameterOptional.get();
+
+            int responseOffset = 0;
+            for(int i = 0; i < varParameter.getItems().size(); i++) {
+                S7AnyVarParameterItem requestItem = (S7AnyVarParameterItem) varParameter.getItems().get(i);
+
+                // Get the pairs of corresponding parameter and payload items.
+                S7AnyVarParameterItem responseParameterItem = (S7AnyVarParameterItem) parameterItems.get(i + responseOffset);
+                VarPayloadItem responsePayloadItem = payloadItems.get(i + responseOffset);
+                int dataOffset = (responsePayloadItem.getData() != null) ? responsePayloadItem.getData().length : 0;
+
+                // The resulting parameter items is identical to the request parameter item.
+                mergedParameterItems.add(requestItem);
+
+                // The payload will have to be merged and the return codes will have to be examined.
+                if(requestItem.getNumElements() != responseParameterItem.getNumElements()) {
+                    int totalSizeInBytes = requestItem.getNumElements() * requestItem.getDataType().getSizeInBytes();
+                    byte[] data = new byte[totalSizeInBytes];
+                    System.arraycopy(responsePayloadItem.getData(), 0, data, 0, responsePayloadItem.getData().length);
+
+                    // Initialize the current size, this will be lower than the original, as the only
+                    // way to have different count, is if the request was split up.
+                    int curSize = responseParameterItem.getNumElements();
+
+                    // Now iterate over the succeeding pairs of parameters and payloads till we have
+                    // found the original number of elements.
+                    while(curSize < totalSizeInBytes) {
+                        responseOffset++;
+                        // No need to process the parameters, we only need them to get the number of items.
+                        responseParameterItem = (S7AnyVarParameterItem) parameterItems.get(i + responseOffset);
+                        curSize += responseParameterItem.getNumElements();
+
+                        // Get the next payload item in the list.
+                        responsePayloadItem = payloadItems.get(i + responseOffset);
+
+                        // Copy the data of this item behind the previous content.
+                        System.arraycopy(responsePayloadItem.getData(), 0, data, dataOffset, responsePayloadItem.getData().length);
+                        dataOffset += responsePayloadItem.getData().length;
                     }
-                    s7Payloads.add(writeVarPayload);
+
+                    mergedPayloadItems.add(new VarPayloadItem(DataTransportErrorCode.OK,
+                        responsePayloadItem.getDataTransportSize(), data));
                 } else {
-                    s7Payloads.add(payload);
+                    mergedPayloadItems.add(responsePayloadItem);
                 }
             }
+
+            s7Parameters.add(new VarParameter(varParameter.getType(), mergedParameterItems));
+            s7Payloads.add(new VarPayload(varParameter.getType(), mergedPayloadItems));
         }
-        if(firstResponse != null) {
-            MessageType messageType = firstResponse.getMessageType();
-            return new S7ResponseMessage(messageType, tpduReference, s7Parameters, s7Payloads, errorClass, errorCode);
-        }
-        return null;
+        // TODO: The error codes are wrong
+        return new S7ResponseMessage(messageType, tpduReference, s7Parameters, s7Payloads, (byte) 0xFF, (byte) 0xFF);
     }
 
     static class S7CompositeRequestMessage implements PlcProtocolMessage {
 
         private S7RequestMessage originalRequest;
-        private Collection<S7RequestMessage> requestMessages;
-        private Collection<S7ResponseMessage> responseMessages;
+        private List<S7RequestMessage> requestMessages;
+        private List<S7ResponseMessage> responseMessages;
 
         S7CompositeRequestMessage(S7RequestMessage originalRequest) {
             this.originalRequest = originalRequest;
@@ -419,7 +470,7 @@ public class DefaultS7MessageProcessor implements S7MessageProcessor {
             requestMessages.add(requestMessage);
         }
 
-        private Collection<S7RequestMessage> getRequestMessages() {
+        public List<S7RequestMessage> getRequestMessages() {
             return requestMessages;
         }
 
@@ -427,7 +478,7 @@ public class DefaultS7MessageProcessor implements S7MessageProcessor {
             responseMessages.add(responseMessage);
         }
 
-        private Collection<S7ResponseMessage> getResponseMessages() {
+        public List<S7ResponseMessage> getResponseMessages() {
             return responseMessages;
         }
     }
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/util/S7RequestSizeCalculator.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/util/S7RequestSizeCalculator.java
index c8feeee..392c1c9 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/util/S7RequestSizeCalculator.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/util/S7RequestSizeCalculator.java
@@ -146,6 +146,7 @@ public class S7RequestSizeCalculator {
     private static short getRequestWriteVarPayloadItemSize(VarPayloadItem varPayloadItem) {
         // A var payload item always has a minimum size of 4 bytes (return code, transport size, size (two bytes))
         short length = 4;
+        // Data is a byte array ... so there is no need to translate this into number of bytes.
         length += varPayloadItem.getData().length;
         // It seems that bit payloads need a additional separating 0x00 byte.
         if(varPayloadItem.getDataTransportSize().isSizeInBits()) {
diff --git a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessorTest.java b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessorTest.java
index a92d2b8..3b6405d 100644
--- a/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessorTest.java
+++ b/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessorTest.java
@@ -21,9 +21,11 @@ package org.apache.plc4x.java.s7.netty.strategies;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.s7.netty.model.messages.S7RequestMessage;
 import org.apache.plc4x.java.s7.netty.model.messages.S7ResponseMessage;
+import org.apache.plc4x.java.s7.netty.model.params.S7Parameter;
 import org.apache.plc4x.java.s7.netty.model.params.VarParameter;
 import org.apache.plc4x.java.s7.netty.model.params.items.S7AnyVarParameterItem;
 import org.apache.plc4x.java.s7.netty.model.params.items.VarParameterItem;
+import org.apache.plc4x.java.s7.netty.model.payloads.S7Payload;
 import org.apache.plc4x.java.s7.netty.model.payloads.VarPayload;
 import org.apache.plc4x.java.s7.netty.model.payloads.items.VarPayloadItem;
 import org.apache.plc4x.java.s7.netty.model.types.*;
@@ -33,6 +35,7 @@ import org.junit.Test;
 
 import java.util.*;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
 import static org.hamcrest.core.Is.is;
@@ -256,6 +259,36 @@ public class DefaultS7MessageProcessorTest {
     }
 
     /**
+     * In this request, the request size itself is way within the bounds set by the PDU size parameter,
+     * however the estimated size of the response would exceed this greatly. In contrast to the test above
+     * a single item is too large for being sent at all. So in this case the item should be split up into
+     * multiple element and joined together after all sub-messages are returned.
+     *
+     * @throws PlcException something went wrong.
+     */
+    @Test
+    public void readMessageWithTooLargeResponseSize() throws PlcException {
+        S7RequestMessage request = createReadMessage(
+            Collections.singletonList(
+                new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                    TransportSize.REAL, (short) 400, (short) 1, (short) 0, (byte) 0)));
+        Collection<S7RequestMessage> processedRequests = SUT.processRequest(request, 256);
+
+        assertThat(processedRequests, notNullValue());
+        assertThat(processedRequests, hasSize(7));
+
+        int totalNumItems = 0;
+        for (S7RequestMessage requestMessage : processedRequests) {
+            Optional<VarParameter> parameter = requestMessage.getParameter(VarParameter.class);
+            assertThat(parameter.isPresent(), is(true));
+            VarParameter varParameter = parameter.get();
+            assertThat(varParameter.getItems(), hasSize(1));
+            totalNumItems += ((S7AnyVarParameterItem) varParameter.getItems().iterator().next()).getNumElements();
+        }
+        assertThat(totalNumItems, equalTo(400));
+    }
+
+    /**
      * In this request, we only send one single element to one single field. Nothing should be changed.
      *
      * @throws PlcException something went wrong.
@@ -310,7 +343,7 @@ public class DefaultS7MessageProcessorTest {
         // Initialize a set of expected fields.
         Set<String> expectedFields = new HashSet<>(10);
         for(int i = 0; i < 10; i++) {
-            expectedFields.add(Integer.toString(i / 8) + "/" + Integer.toString(i % 8));
+            expectedFields.add(i / 8 + "/" + i % 8);
         }
 
         // We are expecting to receive 10 messages as we had an array of 10 items.
@@ -334,8 +367,7 @@ public class DefaultS7MessageProcessorTest {
             assertThat(s7AnyParameterItem.getMemoryArea(), is(MemoryArea.DATA_BLOCKS));
             assertThat(s7AnyParameterItem.getDataType(), is(TransportSize.BOOL));
             assertThat(s7AnyParameterItem.getNumElements(), is(1));
-            String fieldString = Short.toString(
-                s7AnyParameterItem.getByteOffset()) + "/" + Byte.toString(s7AnyParameterItem.getBitOffset());
+            String fieldString = s7AnyParameterItem.getByteOffset() + "/" + s7AnyParameterItem.getBitOffset();
             assertThat(expectedFields, IsCollectionContaining.hasItem(fieldString));
 
             VarPayloadItem payloadItem = varPayload.getItems().iterator().next();
@@ -600,15 +632,42 @@ public class DefaultS7MessageProcessorTest {
     @Test
     public void processCompositeMessageReadResponse() throws PlcException {
         S7RequestMessage originalRequestMessage = new S7RequestMessage(MessageType.JOB, (short) 1,
-            Collections.emptyList(), Collections.emptyList(), null);
+            Collections.singletonList(
+                new VarParameter(ParameterType.READ_VAR,
+                    Arrays.asList(
+                        new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                            TransportSize.BYTE, (short) 1, (short) 1, (short) 2, (byte) 0),
+                        new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                            TransportSize.BYTE, (short) 1, (short) 3, (short) 4, (byte) 0))
+                )
+            ),
+            Collections.emptyList(),
+            null);
         DefaultS7MessageProcessor.S7CompositeRequestMessage compositeRequestMessage =
             new DefaultS7MessageProcessor.S7CompositeRequestMessage(originalRequestMessage);
 
-        S7RequestMessage fragment1RequestMessage = new S7RequestMessage(MessageType.JOB, (short) 2,
-            Collections.emptyList(), Collections.emptyList(), compositeRequestMessage);
+        S7RequestMessage fragment1RequestMessage = new S7RequestMessage(MessageType.JOB, (short) 1,
+            Collections.singletonList(
+                new VarParameter(ParameterType.READ_VAR,
+                    Arrays.asList(
+                        new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                            TransportSize.BYTE, (short) 1, (short) 1, (short) 2, (byte) 0))
+                )
+            ),
+            Collections.emptyList(),
+            compositeRequestMessage);
         compositeRequestMessage.addRequestMessage(fragment1RequestMessage);
+
         S7RequestMessage fragment2RequestMessage = new S7RequestMessage(MessageType.JOB, (short) 3,
-            Collections.emptyList(), Collections.emptyList(), compositeRequestMessage);
+            Collections.singletonList(
+                new VarParameter(ParameterType.READ_VAR,
+                    Arrays.asList(
+                        new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                            TransportSize.BYTE, (short) 1, (short) 3, (short) 4, (byte) 0))
+                )
+            ),
+            Collections.emptyList(),
+            compositeRequestMessage);
         compositeRequestMessage.addRequestMessage(fragment2RequestMessage);
 
         // Virtually add a response for the first response.
@@ -658,22 +717,81 @@ public class DefaultS7MessageProcessorTest {
 
     /**
      * This test handles the special case in which a response is part of a single request message.
-     * This means that it is immediatly finished and is hereby immediatly processed.
+     * This means that it is immediately finished and is hereby immediately processed.
+     *
+     * @throws PlcException
+     */
+    @Test
+    public void processCompositeMessageMergedReadResponse() throws PlcException {
+        // Produce a composite request, consisting of an original request reading 4*10=40 items.
+        // Where this is split up into 4 sub-messages with each 10 items.
+        DefaultS7MessageProcessor.S7CompositeRequestMessage compositeRequestMessage = createCompositeReadMessage(4, 10);
+
+        List<S7RequestMessage> requestMessages = compositeRequestMessage.getRequestMessages();
+        // Generate dummy responses for each sub request.
+        List<S7ResponseMessage> responseMessages = createResponseMessages(requestMessages);
+
+        // Iterate over all pairs of requests and responses and have the processor process that.
+        S7ResponseMessage processedResponse = null;
+        int size = requestMessages.size();
+        for (int i = 0; i < size; i++) {
+            S7RequestMessage requestMessage = requestMessages.get(i);
+            S7ResponseMessage responseMessage = responseMessages.get(i);
+            requestMessage.setAcknowledged(true);
+
+            processedResponse = SUT.processResponse(requestMessage, responseMessage);
+            // Only after processing the least response, should the processor return something not null.
+            if(i < size - 1) {
+                assertThat(processedResponse, nullValue());
+            }
+        }
+
+        // Check the content.
+        assertThat(processedResponse, notNullValue());
+        assertThat(processedResponse.getParameters(), hasSize(1));
+        assertThat(processedResponse.getParameter(VarParameter.class).isPresent(), is(true));
+        VarParameter varParameter = processedResponse.getParameter(VarParameter.class).get();
+        assertThat(varParameter.getItems(), hasSize(1));
+
+        assertThat(processedResponse.getPayloads(), hasSize(1));
+        assertThat(processedResponse.getPayload(VarPayload.class).isPresent(), is(true));
+
+        VarPayload varPayload = processedResponse.getPayload(VarPayload.class).get();
+        assertThat(varPayload.getItems(), hasSize(1));
+    }
+
+    /**
+     * This test handles the special case in which a response is part of a single request message.
+     * This means that it is immediately finished and is hereby immediately processed.
      *
      * @throws PlcException
      */
     @Test
     public void processCompositeMessageWriteResponse() throws PlcException {
         S7RequestMessage originalRequestMessage = new S7RequestMessage(MessageType.JOB, (short) 1,
-            Collections.emptyList(), Collections.emptyList(), null);
+            Collections.singletonList(
+                new VarParameter(ParameterType.WRITE_VAR, new LinkedList<>(Arrays.asList(
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, (short) 1, (short) 1, (short) 2, (byte) 0),
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, (short) 1, (short) 1, (short) 3, (byte) 0))))),
+            Collections.emptyList(), null);
         DefaultS7MessageProcessor.S7CompositeRequestMessage compositeRequestMessage =
             new DefaultS7MessageProcessor.S7CompositeRequestMessage(originalRequestMessage);
 
         S7RequestMessage fragment1RequestMessage = new S7RequestMessage(MessageType.JOB, (short) 2,
-            Collections.emptyList(), Collections.emptyList(), compositeRequestMessage);
+            Collections.singletonList(
+                new VarParameter(ParameterType.WRITE_VAR, new LinkedList<>(Collections.singletonList(
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, (short) 1, (short) 1, (short) 2, (byte) 0))))),
+            Collections.emptyList(), compositeRequestMessage);
         compositeRequestMessage.addRequestMessage(fragment1RequestMessage);
         S7RequestMessage fragment2RequestMessage = new S7RequestMessage(MessageType.JOB, (short) 3,
-            Collections.emptyList(), Collections.emptyList(), compositeRequestMessage);
+            Collections.singletonList(
+                new VarParameter(ParameterType.WRITE_VAR, new LinkedList<>(Collections.singletonList(
+                    new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                        TransportSize.BYTE, (short) 1, (short) 1, (short) 3, (byte) 0))))),
+            Collections.emptyList(), compositeRequestMessage);
         compositeRequestMessage.addRequestMessage(fragment2RequestMessage);
 
         // Virtually add a response for the first response.
@@ -697,14 +815,16 @@ public class DefaultS7MessageProcessorTest {
             Collections.singletonList(
                 new VarParameter(ParameterType.WRITE_VAR, new LinkedList<>(Collections.singletonList(
                     new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
-                        TransportSize.BYTE, (short) 1, (short) 3, (short) 4, (byte) 0))))),
+                        TransportSize.BYTE, (short) 1, (short) 1, (short) 3, (byte) 0))))),
             Collections.singletonList(
                 new VarPayload(ParameterType.WRITE_VAR, new LinkedList<>(Collections.singletonList(
                     new VarPayloadItem(DataTransportErrorCode.OK, DataTransportSize.BYTE_WORD_DWORD, new byte[]{0x23}))))),
             (byte) 0x00, (byte) 0x00);
+
         // This time we expect all messages of the composite to be acknowledged and the processResponse should
         // return a merged version of the individual responses content.
         processedResponse = SUT.processResponse(fragment2RequestMessage, fragment2ResponseMessage);
+
         // As this is the last request being responded, the result should be not null this time.
         assertThat(processedResponse, notNullValue());
 
@@ -741,4 +861,71 @@ public class DefaultS7MessageProcessorTest {
             null);
     }
 
+    private DefaultS7MessageProcessor.S7CompositeRequestMessage createCompositeReadMessage(int numSubMessages, int numItemsPerSubMessage) {
+        int totalItems = numSubMessages * numItemsPerSubMessage;
+        S7RequestMessage originalRequestMessage = new S7RequestMessage(MessageType.JOB, (short) 1,
+            Collections.singletonList(
+                new VarParameter(ParameterType.READ_VAR,
+                    Collections.singletonList(
+                        new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                            TransportSize.BYTE, (short) totalItems, (short) 1, (short) 0, (byte) 0))
+                )
+            ),
+            Collections.emptyList(),
+            null);
+        DefaultS7MessageProcessor.S7CompositeRequestMessage compositeRequestMessage =
+            new DefaultS7MessageProcessor.S7CompositeRequestMessage(originalRequestMessage);
+
+        int curPos = 0;
+        for(int i = 0; i < numSubMessages; i++) {
+            S7RequestMessage subMessage = new S7RequestMessage(MessageType.JOB, (short) 1,
+                Collections.singletonList(
+                    new VarParameter(ParameterType.READ_VAR,
+                        Collections.singletonList(
+                            new S7AnyVarParameterItem(SpecificationType.VARIABLE_SPECIFICATION, MemoryArea.DATA_BLOCKS,
+                                TransportSize.BYTE, (short) numItemsPerSubMessage, (short) 1, (short) curPos, (byte) 0))
+                    )
+                ),
+                Collections.emptyList(),
+                compositeRequestMessage);
+            curPos += numItemsPerSubMessage;
+            compositeRequestMessage.addRequestMessage(subMessage);
+        }
+
+        return compositeRequestMessage;
+    }
+
+    private DefaultS7MessageProcessor.S7CompositeRequestMessage createCompositeWriteMessage() {
+        return null;
+    }
+
+    private List<S7ResponseMessage> createResponseMessages(Collection<S7RequestMessage> requests) {
+        List<S7ResponseMessage> responses = new ArrayList<>(requests.size());
+        byte counter = 0;
+        for (S7RequestMessage request : requests) {
+            List<S7Payload> payloads = new ArrayList<>(request.getParameters().size());
+            for (S7Parameter parameter : request.getParameters()) {
+                VarParameter varParameter = (VarParameter) parameter;
+                List<VarPayloadItem> varPayloadItems = new ArrayList<>(varParameter.getItems().size());
+                for (VarParameterItem item : varParameter.getItems()) {
+                    S7AnyVarParameterItem anyVarParameterItem = (S7AnyVarParameterItem) item;
+                    byte[] data = new byte[
+                        anyVarParameterItem.getDataType().getSizeInBytes() * anyVarParameterItem.getNumElements()];
+                    for(int i = 0; i < data.length; i++) {
+                        data[i] = counter++;
+                    }
+                    VarPayloadItem payloadItem = new VarPayloadItem(DataTransportErrorCode.OK,
+                        anyVarParameterItem.getDataType().getDataTransportSize(), data);
+                    varPayloadItems.add(payloadItem);
+                }
+                S7Payload payload = new VarPayload(parameter.getType(), varPayloadItems);
+                payloads.add(payload);
+            }
+            S7ResponseMessage response = new S7ResponseMessage(request.getMessageType(), request.getTpduReference(),
+                request.getParameters(), payloads, (byte) 0xFF, (byte) 0xFF);
+            responses.add(response);
+        }
+        return responses;
+    }
+
 }


[incubator-plc4x] 01/02: - Fixed some typos in the script

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 6edb3f8b92cad564439fbce166e540f6b2bf9fc4
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Sat Nov 24 22:24:42 2018 +0100

    - Fixed some typos in the script
---
 tools/common.sh | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/tools/common.sh b/tools/common.sh
index e899f9a..5994911 100755
--- a/tools/common.sh
+++ b/tools/common.sh
@@ -87,15 +87,15 @@ function dieSuperceeded { # no args
   die "This tool is superceeded with the new maven build tooling.  See src/site/asciidoc/releasing.adoc."
 }
 
-function checkPLX4XSourceRootGitDie { # no args; dies if !ok
-  [ -d "${PLC4X_ROOT_DIR}/.git" ] || die "Not an PLX4X source root git directory \"${PLC4X_ROOT_DIR}\""
+function checkPLC4XSourceRootGitDie { # no args; dies if !ok
+  [ -d "${PLC4X_ROOT_DIR}/.git" ] || die "Not an PLC4X source root git directory \"${PLC4X_ROOT_DIR}\""
 }
 
 function checkUsingMgmtCloneWarn() { # no args; warns if plc4x root isn't a mgmt clone
   CLONE_DIR=`cd ${PLC4X_ROOT_DIR}; pwd`
   CLONE_DIRNAME=`basename $CLONE_DIR`
   if [ ! `echo $CLONE_DIRNAME | grep -o -E '^mgmt-plc4x'` ]; then
-    echo "Warning: the PLX4X root dir \"${PLC4X_ROOT_DIR}\" is not a release mgmt clone!"
+    echo "Warning: the PLC4X root dir \"${PLC4X_ROOT_DIR}\" is not a release mgmt clone!"
     return 1
   else
     return 0
@@ -147,14 +147,14 @@ function getReleaseProperty {  # <property-name>
   echo ${VAL}
 }
 
-function getPLX4XVer() {  # [$1 == "bundle"]
-  MSG="getPLX4XVer(): unknown mode \"$1\""
+function getPLC4XVer() {  # [$1 == "bundle"]
+  MSG="getPLC4XVer(): unknown mode \"$1\""
   VER=""
   if [ "$1" == "" ]; then
     VER=`getReleaseProperty releaseNum`
     MSG="Unable to identify the release version id from ${RELEASE_PROP_FILE}"
   elif [ $1 == "gradle" ]; then
-    die "'getPLX4XVer() gradle' is no longer supported"
+    die "'getPLC4XVer() gradle' is no longer supported"
     # Get the X.Y.Z version from gradle build info
     PROPS=${PLC4X_ROOT_DIR}/gradle.properties
     VER=`grep build_version ${PROPS} | grep -o -E '\d+\.\d+\.\d+'`
@@ -198,5 +198,5 @@ function getReleaseTagComment() {  # $1: X.Y.Z  [$2: rc-num]
   if [ $# -gt 0 ] && [ "$1" != "" ]; then
     RC_SFX=" RC$1"
   fi
-  echo "Apache PLX4X ${VER}-incubating${RC_SFX}"
+  echo "Apache PLC4X ${VER}-incubating${RC_SFX}"
 }