You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2022/06/26 17:43:24 UTC

[plc4x] 02/03: Split the write method into seperate methods based on if connection manager is available.

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch eip_update
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 405a360a8f922529662e6ebc3e6929d6c06b6cbd
Author: Ben Hutcheson <be...@gmail.com>
AuthorDate: Mon Jun 27 03:21:55 2022 +1000

    Split the write method into seperate methods based on if connection manager is available.
---
 .../java/eip/base/protocol/EipProtocolLogic.java   | 278 +++++++++++++++++++--
 1 file changed, 252 insertions(+), 26 deletions(-)

diff --git a/plc4j/drivers/eip/src/main/java/org/apache/plc4x/java/eip/base/protocol/EipProtocolLogic.java b/plc4j/drivers/eip/src/main/java/org/apache/plc4x/java/eip/base/protocol/EipProtocolLogic.java
index 01dd54698..32c36f9c0 100644
--- a/plc4j/drivers/eip/src/main/java/org/apache/plc4x/java/eip/base/protocol/EipProtocolLogic.java
+++ b/plc4j/drivers/eip/src/main/java/org/apache/plc4x/java/eip/base/protocol/EipProtocolLogic.java
@@ -156,7 +156,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
             this.configuration.getByteOrder()
         );
 
-        List<TypeId> typeIds = new ArrayList<TypeId>() {
+        List<TypeId> typeIds = new ArrayList<>() {
             {
                 add(nullAddressItem);
                 add(exchange);
@@ -284,7 +284,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
             this.configuration.getByteOrder()
         );
 
-        List<TypeId> typeIds = new ArrayList<TypeId>() {
+        List<TypeId> typeIds = new ArrayList<>() {
             {
                 add(nullAddressItem);
                 add(exchange);
@@ -355,7 +355,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
                 this.configuration.getByteOrder()
             );
 
-            List<TypeId> typeIds = new ArrayList<TypeId>() {
+            List<TypeId> typeIds = new ArrayList<>() {
                 {
                     add(nullAddressItem);
                     add(exchange);
@@ -495,10 +495,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
         for (PlcField field : request.getFields()) {
             EipField plcField = (EipField) field;
             String tag = plcField.getTag();
-            int elements = 1;
-            if (plcField.getElementNb() > 1) {
-                elements = plcField.getElementNb();
-            }
+
             try {
                 CipReadRequest req = new CipReadRequest(
                     toAnsi(tag, this.configuration.getByteOrder()),
@@ -506,9 +503,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
                     -1,
                     this.configuration.getByteOrder());
 
-                CipUnconnectedRequest unreq;
-
-                unreq = new CipUnconnectedRequest(
+                CipUnconnectedRequest unreq = new CipUnconnectedRequest(
                     classSegment,
                     instanceSegment,
                     req,
@@ -572,23 +567,17 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
         return future;
     }
 
-
     private CompletableFuture<PlcReadResponse> readWithConnectionManager(PlcReadRequest readRequest) {
         CompletableFuture<PlcReadResponse> future = new CompletableFuture<>();
         RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
 
-        PathSegment classSegment = new LogicalSegment(new ClassID((byte) 0, (short) 6, this.configuration.getByteOrder()), this.configuration.getByteOrder());
-        PathSegment instanceSegment = new LogicalSegment(new InstanceID((byte) 0, (short) 1, this.configuration.getByteOrder()), this.configuration.getByteOrder());
-
         DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest;
         List<CipService> requests = new ArrayList<>(request.getNumberOfFields());
+
         for (PlcField field : request.getFields()) {
             EipField plcField = (EipField) field;
             String tag = plcField.getTag();
-            int elements = 1;
-            if (plcField.getElementNb() > 1) {
-                elements = plcField.getElementNb();
-            }
+
             try {
                 CipReadRequest req = new CipReadRequest(
                     toAnsi(tag, this.configuration.getByteOrder()),
@@ -603,7 +592,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
         }
 
         List<TypeId> typeIds =new ArrayList<>(2);
-
+        typeIds.add(nullAddressItem);
         typeIds.add(new ConnectedAddressItem(this.connectionId, this.configuration.getByteOrder()));
         if (requests.size() == 1) {
             typeIds.add(new ConnectedDataItem(this.sequenceCount, requests.get(0), this.configuration.getByteOrder()));
@@ -631,6 +620,8 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
             this.configuration.getByteOrder()
         );
 
+        this.sequenceCount += 1;
+
         transaction.submit(() -> context.sendRequest(pkt)
             .expectResponse(EipPacket.class, REQUEST_TIMEOUT)
             .onTimeout(future::completeExceptionally)
@@ -882,8 +873,96 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
         return Float.intBitsToFloat(b1 << 24 | b2 << 16 | b3 << 8 | b4);
     }
 
-    @Override
-    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+    public CompletableFuture<PlcWriteResponse> writeWithoutMessageRouter(PlcWriteRequest writeRequest) {
+        CompletableFuture<PlcWriteResponse> future = new CompletableFuture<>();
+        DefaultPlcWriteRequest request = (DefaultPlcWriteRequest) writeRequest;
+        List<CipWriteRequest> items = new ArrayList<>(writeRequest.getNumberOfFields());
+        PathSegment classSegment = new LogicalSegment(new ClassID((byte) 0, (short) 6, this.configuration.getByteOrder()), this.configuration.getByteOrder());
+        PathSegment instanceSegment = new LogicalSegment(new InstanceID((byte) 0, (short) 1, this.configuration.getByteOrder()), this.configuration.getByteOrder());
+        Map<String, PlcResponseCode> values = new HashMap<>();
+
+        for (String fieldName : writeRequest.getFieldNames()) {
+            final EipField field = (EipField) request.getField(fieldName);
+            final PlcValue value = request.getPlcValue(fieldName);
+            String tag = field.getTag();
+            int elements = 1;
+            if (field.getElementNb() > 1) {
+                elements = field.getElementNb();
+            }
+
+            byte[] data = encodeValue(value, field.getType(), (short) elements);
+            CipWriteRequest writeReq = null;
+            try {
+                writeReq = new CipWriteRequest(toAnsi(tag, this.configuration.getByteOrder()), field.getType(), elements, data, -1, this.configuration.getByteOrder());
+            } catch (SerializationException e) {
+                e.printStackTrace();
+            }
+            CompletableFuture<Boolean> internalFuture = new CompletableFuture<>();
+            RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+
+            tm.startRequest();
+
+            UnConnectedDataItem exchange = new UnConnectedDataItem(
+                new CipUnconnectedRequest(
+                    classSegment,
+                    instanceSegment,
+                    writeReq,
+                    (byte) configuration.getBackplane(),
+                    (byte) configuration.getSlot(),
+                    -1,
+                    this.configuration.getByteOrder()
+                ),
+                this.configuration.getByteOrder()
+            );
+
+            List<TypeId> typeIds = new ArrayList<>() {
+                {
+                    add(nullAddressItem);
+                    add(exchange);
+                }
+            };
+
+            CipRRData rrdata = new CipRRData(
+                sessionHandle,
+                0L,
+                senderContext,
+                0L,
+                EMPTY_INTERFACE_HANDLE,
+                0,
+                2,
+                typeIds,
+                this.configuration.getByteOrder()
+            );
+
+            transaction.submit(() -> context.sendRequest(rrdata)
+                .expectResponse(EipPacket.class, REQUEST_TIMEOUT)
+                .onTimeout(future::completeExceptionally)
+                .onError((p, e) -> future.completeExceptionally(e))
+                .check(p -> p instanceof CipRRData).unwrap(p -> (CipRRData) p)
+                .check(p -> p.getSessionHandle() == sessionHandle)
+                //.check(p -> p.getSenderContext() == senderContext)
+                .check(p -> ((UnConnectedDataItem) p.getTypeId().get(1)).getService() instanceof CipWriteResponse)
+                .unwrap(p -> (CipWriteResponse) ((UnConnectedDataItem) p.getTypeId().get(1)).getService())
+                .handle(p -> {
+                    Map<String, PlcResponseCode> responseItem = decodeSingleWriteResponse(p, fieldName, field);
+                    values.putAll(responseItem);
+                    internalFuture.complete(true);
+                    transaction.endRequest();
+                })
+            );
+            try {
+                internalFuture.get(REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                future.completeExceptionally(new PlcRuntimeException("Failed to read field"));
+            }
+
+        }
+        PlcWriteResponse response = new DefaultPlcWriteResponse(writeRequest, values);
+        future.complete(response);
+        return future;
+    }
+
+    public CompletableFuture<PlcWriteResponse> writeWithoutConnectionManager(PlcWriteRequest writeRequest) {
         CompletableFuture<PlcWriteResponse> future = new CompletableFuture<>();
         DefaultPlcWriteRequest request = (DefaultPlcWriteRequest) writeRequest;
         List<CipWriteRequest> items = new ArrayList<>(writeRequest.getNumberOfFields());
@@ -927,7 +1006,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
                 this.configuration.getByteOrder()
             );
 
-            List<TypeId> typeIds = new ArrayList<TypeId>() {
+            List<TypeId> typeIds = new ArrayList<>() {
                 {
                     add(nullAddressItem);
                     add(exchange);
@@ -953,8 +1032,8 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
                 .check(p -> p instanceof CipRRData).unwrap(p -> (CipRRData) p)
                 .check(p -> p.getSessionHandle() == sessionHandle)
                 //.check(p -> p.getSenderContext() == senderContext)
-                .check(p -> ((UnConnectedDataItem) p.getTypeId().get(0)).getService() instanceof CipWriteResponse)
-                .unwrap(p -> (CipWriteResponse) ((UnConnectedDataItem) p.getTypeId().get(0)).getService())
+                .check(p -> ((UnConnectedDataItem) p.getTypeId().get(1)).getService() instanceof CipWriteResponse)
+                .unwrap(p -> (CipWriteResponse) ((UnConnectedDataItem) p.getTypeId().get(1)).getService())
                 .handle(p -> {
                     future.complete((PlcWriteResponse) decodeWriteResponse(p, writeRequest));
                     transaction.endRequest();
@@ -993,7 +1072,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
                 this.configuration.getByteOrder()
             );
 
-            List<TypeId> typeIds = new ArrayList<TypeId>() {
+            List<TypeId> typeIds = new ArrayList<>() {
                 {
                     add(nullAddressItem);
                     add(exchange);
@@ -1020,7 +1099,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
                 .check(p -> p.getSessionHandle() == sessionHandle)
                 //.check(p -> p.getSenderContext() == senderContext)
                 .unwrap(p -> (CipRRData) p)
-                .unwrap(p -> ((UnConnectedDataItem) p.getTypeId().get(0)).getService()).check(p -> p instanceof MultipleServiceResponse)
+                .unwrap(p -> ((UnConnectedDataItem) p.getTypeId().get(1)).getService()).check(p -> p instanceof MultipleServiceResponse)
                 .unwrap(p -> (MultipleServiceResponse) p)
                 .check(p -> p.getServiceNb() == nb)
                 .handle(p -> {
@@ -1032,6 +1111,153 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
         return future;
     }
 
+    public CompletableFuture<PlcWriteResponse> writeWithConnectionManager(PlcWriteRequest writeRequest) {
+        CompletableFuture<PlcWriteResponse> future = new CompletableFuture<>();
+        DefaultPlcWriteRequest request = (DefaultPlcWriteRequest) writeRequest;
+        List<CipWriteRequest> items = new ArrayList<>(writeRequest.getNumberOfFields());
+        for (String fieldName : request.getFieldNames()) {
+            final EipField field = (EipField) request.getField(fieldName);
+            final PlcValue value = request.getPlcValue(fieldName);
+            String tag = field.getTag();
+            int elements = 1;
+            if (field.getElementNb() > 1) {
+                elements = field.getElementNb();
+            }
+
+            byte[] data = encodeValue(value, field.getType(), (short) elements);
+            try {
+                CipWriteRequest writeReq = new CipWriteRequest(toAnsi(tag, this.configuration.getByteOrder()), field.getType(), elements, data, -1, this.configuration.getByteOrder());
+                items.add(writeReq);
+            } catch (SerializationException e) {
+                e.printStackTrace();
+            }
+
+        }
+
+        RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+        if (items.size() == 1) {
+            tm.startRequest();
+
+            ConnectedDataItem exchange = new ConnectedDataItem(
+                this.sequenceCount,
+                items.get(0),
+                this.configuration.getByteOrder()
+            );
+
+            List<TypeId> typeIds = new ArrayList<>() {
+                {
+                    add(nullAddressItem);
+                    add(exchange);
+                }
+            };
+
+            SendUnitData rrdata = new SendUnitData(
+                sessionHandle,
+                CIPStatus.Success.getValue(),
+                senderContext,
+                0L,
+                0,
+                2,
+                typeIds,
+                this.configuration.getByteOrder()
+            );
+
+            transaction.submit(() -> context.sendRequest(rrdata)
+                .expectResponse(EipPacket.class, REQUEST_TIMEOUT)
+                .onTimeout(future::completeExceptionally)
+                .onError((p, e) -> future.completeExceptionally(e))
+                .check(p -> p instanceof SendUnitData).unwrap(p -> (SendUnitData) p)
+                .check(p -> p.getSessionHandle() == sessionHandle)
+                .check(p -> ((ConnectedDataItem) p.getTypeId().get(1)).getService() instanceof CipWriteResponse)
+                .unwrap(p -> (CipWriteResponse) ((ConnectedDataItem) p.getTypeId().get(1)).getService())
+                .handle(p -> {
+                    future.complete((PlcWriteResponse) decodeWriteResponse(p, writeRequest));
+                    transaction.endRequest();
+                })
+            );
+        } else {
+            tm.startRequest();
+            short nb = (short) items.size();
+            List<Integer> offsets = new ArrayList<>(nb);
+            int offset = 2 + nb * 2;
+            for (int i = 0; i < nb; i++) {
+                offsets.add(offset);
+                offset += items.get(i).getLengthInBytes();
+            }
+
+            List<CipService> serviceArr = new ArrayList<>(nb);
+            for (int i = 0; i < nb; i++) {
+                serviceArr.add(items.get(i));
+            }
+            Services data = new Services(offsets, serviceArr, -1, this.configuration.getByteOrder());
+            //Encapsulate the data
+
+            PathSegment classSegment = new LogicalSegment(new ClassID((byte) 0, (short) 6, this.configuration.getByteOrder()), this.configuration.getByteOrder());
+            PathSegment instanceSegment = new LogicalSegment(new InstanceID((byte) 0, (short) 1, this.configuration.getByteOrder()), this.configuration.getByteOrder());
+
+            ConnectedDataItem exchange = new ConnectedDataItem(
+                this.sequenceCount,
+                new MultipleServiceRequest(data, -1, this.configuration.getByteOrder()),
+                this.configuration.getByteOrder()
+            );
+
+            List<TypeId> typeIds = new ArrayList<>() {
+                {
+                    add(nullAddressItem);
+                    add(exchange);
+                }
+            };
+
+            SendUnitData pkt = new SendUnitData(
+                sessionHandle,
+                0L,
+                DEFAULT_SENDER_CONTEXT,
+                0L,
+                0,
+                2,
+                typeIds,
+                this.configuration.getByteOrder()
+            );
+
+            transaction.submit(() -> context.sendRequest(pkt)
+                .expectResponse(EipPacket.class, REQUEST_TIMEOUT)
+                .onTimeout(future::completeExceptionally)
+                .onError((p, e) -> future.completeExceptionally(e))
+                .check(p -> p instanceof SendUnitData)
+                .check(p -> p.getSessionHandle() == sessionHandle)
+                //.check(p -> p.getSenderContext() == senderContext)
+                .unwrap(p -> (SendUnitData) p)
+                .unwrap(p -> ((ConnectedDataItem) p.getTypeId().get(1)).getService()).check(p -> p instanceof MultipleServiceResponse)
+                .unwrap(p -> (MultipleServiceResponse) p)
+                .check(p -> p.getServiceNb() == nb)
+                .handle(p -> {
+                    future.complete((PlcWriteResponse) decodeWriteResponse(p, writeRequest));
+                    // Finish the request-transaction.
+                    transaction.endRequest();
+                }));
+        }
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+        CompletableFuture<PlcWriteResponse> future;
+        if (!this.useMessageRouter && !this.useConnectionManager) {
+            future = writeWithoutMessageRouter(writeRequest);
+        } else if (this.useMessageRouter && !this.useConnectionManager) {
+            future = writeWithoutConnectionManager(writeRequest);
+        } else {
+            future = writeWithConnectionManager(writeRequest);
+        }
+        return future;
+    }
+
+    private Map<String, PlcResponseCode> decodeSingleWriteResponse(CipWriteResponse resp, String fieldName, PlcField field) {
+        Map<String, PlcResponseCode> responses = new HashMap<>();
+        responses.put(fieldName, decodeResponseCode(resp.getStatus()));
+        return responses;
+    }
+
     private PlcResponse decodeWriteResponse(CipService p, PlcWriteRequest writeRequest) {
         Map<String, PlcResponseCode> responses = new HashMap<>();