You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2022/06/26 17:43:24 UTC
[plc4x] 02/03: Split the write method into seperate methods based on if connection manager is available.
This is an automated email from the ASF dual-hosted git repository.
hutcheb pushed a commit to branch eip_update
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 405a360a8f922529662e6ebc3e6929d6c06b6cbd
Author: Ben Hutcheson <be...@gmail.com>
AuthorDate: Mon Jun 27 03:21:55 2022 +1000
Split the write method into seperate methods based on if connection manager is available.
---
.../java/eip/base/protocol/EipProtocolLogic.java | 278 +++++++++++++++++++--
1 file changed, 252 insertions(+), 26 deletions(-)
diff --git a/plc4j/drivers/eip/src/main/java/org/apache/plc4x/java/eip/base/protocol/EipProtocolLogic.java b/plc4j/drivers/eip/src/main/java/org/apache/plc4x/java/eip/base/protocol/EipProtocolLogic.java
index 01dd54698..32c36f9c0 100644
--- a/plc4j/drivers/eip/src/main/java/org/apache/plc4x/java/eip/base/protocol/EipProtocolLogic.java
+++ b/plc4j/drivers/eip/src/main/java/org/apache/plc4x/java/eip/base/protocol/EipProtocolLogic.java
@@ -156,7 +156,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
this.configuration.getByteOrder()
);
- List<TypeId> typeIds = new ArrayList<TypeId>() {
+ List<TypeId> typeIds = new ArrayList<>() {
{
add(nullAddressItem);
add(exchange);
@@ -284,7 +284,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
this.configuration.getByteOrder()
);
- List<TypeId> typeIds = new ArrayList<TypeId>() {
+ List<TypeId> typeIds = new ArrayList<>() {
{
add(nullAddressItem);
add(exchange);
@@ -355,7 +355,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
this.configuration.getByteOrder()
);
- List<TypeId> typeIds = new ArrayList<TypeId>() {
+ List<TypeId> typeIds = new ArrayList<>() {
{
add(nullAddressItem);
add(exchange);
@@ -495,10 +495,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
for (PlcField field : request.getFields()) {
EipField plcField = (EipField) field;
String tag = plcField.getTag();
- int elements = 1;
- if (plcField.getElementNb() > 1) {
- elements = plcField.getElementNb();
- }
+
try {
CipReadRequest req = new CipReadRequest(
toAnsi(tag, this.configuration.getByteOrder()),
@@ -506,9 +503,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
-1,
this.configuration.getByteOrder());
- CipUnconnectedRequest unreq;
-
- unreq = new CipUnconnectedRequest(
+ CipUnconnectedRequest unreq = new CipUnconnectedRequest(
classSegment,
instanceSegment,
req,
@@ -572,23 +567,17 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
return future;
}
-
private CompletableFuture<PlcReadResponse> readWithConnectionManager(PlcReadRequest readRequest) {
CompletableFuture<PlcReadResponse> future = new CompletableFuture<>();
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
- PathSegment classSegment = new LogicalSegment(new ClassID((byte) 0, (short) 6, this.configuration.getByteOrder()), this.configuration.getByteOrder());
- PathSegment instanceSegment = new LogicalSegment(new InstanceID((byte) 0, (short) 1, this.configuration.getByteOrder()), this.configuration.getByteOrder());
-
DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest;
List<CipService> requests = new ArrayList<>(request.getNumberOfFields());
+
for (PlcField field : request.getFields()) {
EipField plcField = (EipField) field;
String tag = plcField.getTag();
- int elements = 1;
- if (plcField.getElementNb() > 1) {
- elements = plcField.getElementNb();
- }
+
try {
CipReadRequest req = new CipReadRequest(
toAnsi(tag, this.configuration.getByteOrder()),
@@ -603,7 +592,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
}
List<TypeId> typeIds =new ArrayList<>(2);
-
+ typeIds.add(nullAddressItem);
typeIds.add(new ConnectedAddressItem(this.connectionId, this.configuration.getByteOrder()));
if (requests.size() == 1) {
typeIds.add(new ConnectedDataItem(this.sequenceCount, requests.get(0), this.configuration.getByteOrder()));
@@ -631,6 +620,8 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
this.configuration.getByteOrder()
);
+ this.sequenceCount += 1;
+
transaction.submit(() -> context.sendRequest(pkt)
.expectResponse(EipPacket.class, REQUEST_TIMEOUT)
.onTimeout(future::completeExceptionally)
@@ -882,8 +873,96 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
return Float.intBitsToFloat(b1 << 24 | b2 << 16 | b3 << 8 | b4);
}
- @Override
- public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+ public CompletableFuture<PlcWriteResponse> writeWithoutMessageRouter(PlcWriteRequest writeRequest) {
+ CompletableFuture<PlcWriteResponse> future = new CompletableFuture<>();
+ DefaultPlcWriteRequest request = (DefaultPlcWriteRequest) writeRequest;
+ List<CipWriteRequest> items = new ArrayList<>(writeRequest.getNumberOfFields());
+ PathSegment classSegment = new LogicalSegment(new ClassID((byte) 0, (short) 6, this.configuration.getByteOrder()), this.configuration.getByteOrder());
+ PathSegment instanceSegment = new LogicalSegment(new InstanceID((byte) 0, (short) 1, this.configuration.getByteOrder()), this.configuration.getByteOrder());
+ Map<String, PlcResponseCode> values = new HashMap<>();
+
+ for (String fieldName : writeRequest.getFieldNames()) {
+ final EipField field = (EipField) request.getField(fieldName);
+ final PlcValue value = request.getPlcValue(fieldName);
+ String tag = field.getTag();
+ int elements = 1;
+ if (field.getElementNb() > 1) {
+ elements = field.getElementNb();
+ }
+
+ byte[] data = encodeValue(value, field.getType(), (short) elements);
+ CipWriteRequest writeReq = null;
+ try {
+ writeReq = new CipWriteRequest(toAnsi(tag, this.configuration.getByteOrder()), field.getType(), elements, data, -1, this.configuration.getByteOrder());
+ } catch (SerializationException e) {
+ e.printStackTrace();
+ }
+ CompletableFuture<Boolean> internalFuture = new CompletableFuture<>();
+ RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+
+ tm.startRequest();
+
+ UnConnectedDataItem exchange = new UnConnectedDataItem(
+ new CipUnconnectedRequest(
+ classSegment,
+ instanceSegment,
+ writeReq,
+ (byte) configuration.getBackplane(),
+ (byte) configuration.getSlot(),
+ -1,
+ this.configuration.getByteOrder()
+ ),
+ this.configuration.getByteOrder()
+ );
+
+ List<TypeId> typeIds = new ArrayList<>() {
+ {
+ add(nullAddressItem);
+ add(exchange);
+ }
+ };
+
+ CipRRData rrdata = new CipRRData(
+ sessionHandle,
+ 0L,
+ senderContext,
+ 0L,
+ EMPTY_INTERFACE_HANDLE,
+ 0,
+ 2,
+ typeIds,
+ this.configuration.getByteOrder()
+ );
+
+ transaction.submit(() -> context.sendRequest(rrdata)
+ .expectResponse(EipPacket.class, REQUEST_TIMEOUT)
+ .onTimeout(future::completeExceptionally)
+ .onError((p, e) -> future.completeExceptionally(e))
+ .check(p -> p instanceof CipRRData).unwrap(p -> (CipRRData) p)
+ .check(p -> p.getSessionHandle() == sessionHandle)
+ //.check(p -> p.getSenderContext() == senderContext)
+ .check(p -> ((UnConnectedDataItem) p.getTypeId().get(1)).getService() instanceof CipWriteResponse)
+ .unwrap(p -> (CipWriteResponse) ((UnConnectedDataItem) p.getTypeId().get(1)).getService())
+ .handle(p -> {
+ Map<String, PlcResponseCode> responseItem = decodeSingleWriteResponse(p, fieldName, field);
+ values.putAll(responseItem);
+ internalFuture.complete(true);
+ transaction.endRequest();
+ })
+ );
+ try {
+ internalFuture.get(REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ future.completeExceptionally(new PlcRuntimeException("Failed to read field"));
+ }
+
+ }
+ PlcWriteResponse response = new DefaultPlcWriteResponse(writeRequest, values);
+ future.complete(response);
+ return future;
+ }
+
+ public CompletableFuture<PlcWriteResponse> writeWithoutConnectionManager(PlcWriteRequest writeRequest) {
CompletableFuture<PlcWriteResponse> future = new CompletableFuture<>();
DefaultPlcWriteRequest request = (DefaultPlcWriteRequest) writeRequest;
List<CipWriteRequest> items = new ArrayList<>(writeRequest.getNumberOfFields());
@@ -927,7 +1006,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
this.configuration.getByteOrder()
);
- List<TypeId> typeIds = new ArrayList<TypeId>() {
+ List<TypeId> typeIds = new ArrayList<>() {
{
add(nullAddressItem);
add(exchange);
@@ -953,8 +1032,8 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
.check(p -> p instanceof CipRRData).unwrap(p -> (CipRRData) p)
.check(p -> p.getSessionHandle() == sessionHandle)
//.check(p -> p.getSenderContext() == senderContext)
- .check(p -> ((UnConnectedDataItem) p.getTypeId().get(0)).getService() instanceof CipWriteResponse)
- .unwrap(p -> (CipWriteResponse) ((UnConnectedDataItem) p.getTypeId().get(0)).getService())
+ .check(p -> ((UnConnectedDataItem) p.getTypeId().get(1)).getService() instanceof CipWriteResponse)
+ .unwrap(p -> (CipWriteResponse) ((UnConnectedDataItem) p.getTypeId().get(1)).getService())
.handle(p -> {
future.complete((PlcWriteResponse) decodeWriteResponse(p, writeRequest));
transaction.endRequest();
@@ -993,7 +1072,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
this.configuration.getByteOrder()
);
- List<TypeId> typeIds = new ArrayList<TypeId>() {
+ List<TypeId> typeIds = new ArrayList<>() {
{
add(nullAddressItem);
add(exchange);
@@ -1020,7 +1099,7 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
.check(p -> p.getSessionHandle() == sessionHandle)
//.check(p -> p.getSenderContext() == senderContext)
.unwrap(p -> (CipRRData) p)
- .unwrap(p -> ((UnConnectedDataItem) p.getTypeId().get(0)).getService()).check(p -> p instanceof MultipleServiceResponse)
+ .unwrap(p -> ((UnConnectedDataItem) p.getTypeId().get(1)).getService()).check(p -> p instanceof MultipleServiceResponse)
.unwrap(p -> (MultipleServiceResponse) p)
.check(p -> p.getServiceNb() == nb)
.handle(p -> {
@@ -1032,6 +1111,153 @@ public class EipProtocolLogic extends Plc4xProtocolBase<EipPacket> implements Ha
return future;
}
+ public CompletableFuture<PlcWriteResponse> writeWithConnectionManager(PlcWriteRequest writeRequest) {
+ CompletableFuture<PlcWriteResponse> future = new CompletableFuture<>();
+ DefaultPlcWriteRequest request = (DefaultPlcWriteRequest) writeRequest;
+ List<CipWriteRequest> items = new ArrayList<>(writeRequest.getNumberOfFields());
+ for (String fieldName : request.getFieldNames()) {
+ final EipField field = (EipField) request.getField(fieldName);
+ final PlcValue value = request.getPlcValue(fieldName);
+ String tag = field.getTag();
+ int elements = 1;
+ if (field.getElementNb() > 1) {
+ elements = field.getElementNb();
+ }
+
+ byte[] data = encodeValue(value, field.getType(), (short) elements);
+ try {
+ CipWriteRequest writeReq = new CipWriteRequest(toAnsi(tag, this.configuration.getByteOrder()), field.getType(), elements, data, -1, this.configuration.getByteOrder());
+ items.add(writeReq);
+ } catch (SerializationException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+ if (items.size() == 1) {
+ tm.startRequest();
+
+ ConnectedDataItem exchange = new ConnectedDataItem(
+ this.sequenceCount,
+ items.get(0),
+ this.configuration.getByteOrder()
+ );
+
+ List<TypeId> typeIds = new ArrayList<>() {
+ {
+ add(nullAddressItem);
+ add(exchange);
+ }
+ };
+
+ SendUnitData rrdata = new SendUnitData(
+ sessionHandle,
+ CIPStatus.Success.getValue(),
+ senderContext,
+ 0L,
+ 0,
+ 2,
+ typeIds,
+ this.configuration.getByteOrder()
+ );
+
+ transaction.submit(() -> context.sendRequest(rrdata)
+ .expectResponse(EipPacket.class, REQUEST_TIMEOUT)
+ .onTimeout(future::completeExceptionally)
+ .onError((p, e) -> future.completeExceptionally(e))
+ .check(p -> p instanceof SendUnitData).unwrap(p -> (SendUnitData) p)
+ .check(p -> p.getSessionHandle() == sessionHandle)
+ .check(p -> ((ConnectedDataItem) p.getTypeId().get(1)).getService() instanceof CipWriteResponse)
+ .unwrap(p -> (CipWriteResponse) ((ConnectedDataItem) p.getTypeId().get(1)).getService())
+ .handle(p -> {
+ future.complete((PlcWriteResponse) decodeWriteResponse(p, writeRequest));
+ transaction.endRequest();
+ })
+ );
+ } else {
+ tm.startRequest();
+ short nb = (short) items.size();
+ List<Integer> offsets = new ArrayList<>(nb);
+ int offset = 2 + nb * 2;
+ for (int i = 0; i < nb; i++) {
+ offsets.add(offset);
+ offset += items.get(i).getLengthInBytes();
+ }
+
+ List<CipService> serviceArr = new ArrayList<>(nb);
+ for (int i = 0; i < nb; i++) {
+ serviceArr.add(items.get(i));
+ }
+ Services data = new Services(offsets, serviceArr, -1, this.configuration.getByteOrder());
+ //Encapsulate the data
+
+ PathSegment classSegment = new LogicalSegment(new ClassID((byte) 0, (short) 6, this.configuration.getByteOrder()), this.configuration.getByteOrder());
+ PathSegment instanceSegment = new LogicalSegment(new InstanceID((byte) 0, (short) 1, this.configuration.getByteOrder()), this.configuration.getByteOrder());
+
+ ConnectedDataItem exchange = new ConnectedDataItem(
+ this.sequenceCount,
+ new MultipleServiceRequest(data, -1, this.configuration.getByteOrder()),
+ this.configuration.getByteOrder()
+ );
+
+ List<TypeId> typeIds = new ArrayList<>() {
+ {
+ add(nullAddressItem);
+ add(exchange);
+ }
+ };
+
+ SendUnitData pkt = new SendUnitData(
+ sessionHandle,
+ 0L,
+ DEFAULT_SENDER_CONTEXT,
+ 0L,
+ 0,
+ 2,
+ typeIds,
+ this.configuration.getByteOrder()
+ );
+
+ transaction.submit(() -> context.sendRequest(pkt)
+ .expectResponse(EipPacket.class, REQUEST_TIMEOUT)
+ .onTimeout(future::completeExceptionally)
+ .onError((p, e) -> future.completeExceptionally(e))
+ .check(p -> p instanceof SendUnitData)
+ .check(p -> p.getSessionHandle() == sessionHandle)
+ //.check(p -> p.getSenderContext() == senderContext)
+ .unwrap(p -> (SendUnitData) p)
+ .unwrap(p -> ((ConnectedDataItem) p.getTypeId().get(1)).getService()).check(p -> p instanceof MultipleServiceResponse)
+ .unwrap(p -> (MultipleServiceResponse) p)
+ .check(p -> p.getServiceNb() == nb)
+ .handle(p -> {
+ future.complete((PlcWriteResponse) decodeWriteResponse(p, writeRequest));
+ // Finish the request-transaction.
+ transaction.endRequest();
+ }));
+ }
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+ CompletableFuture<PlcWriteResponse> future;
+ if (!this.useMessageRouter && !this.useConnectionManager) {
+ future = writeWithoutMessageRouter(writeRequest);
+ } else if (this.useMessageRouter && !this.useConnectionManager) {
+ future = writeWithoutConnectionManager(writeRequest);
+ } else {
+ future = writeWithConnectionManager(writeRequest);
+ }
+ return future;
+ }
+
+ private Map<String, PlcResponseCode> decodeSingleWriteResponse(CipWriteResponse resp, String fieldName, PlcField field) {
+ Map<String, PlcResponseCode> responses = new HashMap<>();
+ responses.put(fieldName, decodeResponseCode(resp.getStatus()));
+ return responses;
+ }
+
private PlcResponse decodeWriteResponse(CipService p, PlcWriteRequest writeRequest) {
Map<String, PlcResponseCode> responses = new HashMap<>();