You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by ld...@apache.org on 2020/10/09 09:49:13 UTC
[plc4x] branch feature/socketcan updated: Working on SDO upload
(read) operation coordination.
This is an automated email from the ASF dual-hosted git repository.
ldywicki pushed a commit to branch feature/socketcan
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/feature/socketcan by this push:
new 2b3f594 Working on SDO upload (read) operation coordination.
2b3f594 is described below
commit 2b3f594afaabcf1b0336ea337fb501841fe997f1
Author: Ćukasz Dywicki <lu...@code-house.org>
AuthorDate: Fri Oct 9 11:46:04 2020 +0200
Working on SDO upload (read) operation coordination.
---
.../spi/transaction/RequestTransactionManager.java | 23 +++++-
.../src/main/resources/protocols/can/canopen.mspec | 9 +-
sandbox/test-java-can-driver/pom.xml | 4 +
.../api/conversation/canopen/CANConversation.java | 4 +-
.../conversation/canopen/CANOpenConversation.java | 14 +++-
.../canopen/SDOUploadConversation.java | 95 ++++++++++++++++------
.../java/can/configuration/CANConfiguration.java | 6 +-
.../plc4x/java/can/field/CANOpenPDOField.java | 19 ++++-
.../java/can/protocol/CANOpenProtocolLogic.java | 76 ++++++++++++-----
.../can/protocol/CANOpenSubscriptionHandle.java | 17 +++-
.../java/can/socketcan/SocketCANConversation.java | 32 ++++----
11 files changed, 224 insertions(+), 75 deletions(-)
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
index e49953c..5fb2ed5 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
@@ -21,6 +21,7 @@ package org.apache.plc4x.java.spi.transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import java.util.Objects;
import java.util.Queue;
@@ -173,7 +174,8 @@ public class RequestTransactionManager {
}
public void submit(Runnable operation) {
- this.setOperation(operation);
+ logger.trace("Submission of transaction {}", transactionId);
+ this.setOperation(new TransactionOperation(transactionId, operation));
this.parent.submit(this);
}
@@ -189,6 +191,25 @@ public class RequestTransactionManager {
public int hashCode() {
return Objects.hash(transactionId);
}
+
}
+ static class TransactionOperation implements Runnable {
+ private final int transactionId;
+ private final Runnable delegate;
+
+ public TransactionOperation(int transactionId, Runnable delegate) {
+ this.transactionId = transactionId;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void run() {
+ try (final MDC.MDCCloseable closeable = MDC.putCloseable("plc4x.transactionId", Integer.toString(transactionId))) {
+ logger.trace("Start execution of transaction {}", transactionId);
+ delegate.run();
+ logger.trace("Completed execution of transaction {}", transactionId);
+ }
+ }
+ }
}
diff --git a/protocols/can/src/main/resources/protocols/can/canopen.mspec b/protocols/can/src/main/resources/protocols/can/canopen.mspec
index 10afdf9..7886b88 100644
--- a/protocols/can/src/main/resources/protocols/can/canopen.mspec
+++ b/protocols/can/src/main/resources/protocols/can/canopen.mspec
@@ -353,4 +353,11 @@
]
]
]
-]
\ No newline at end of file
+]
+
+// utility type quickly write data for mapped/manufacturer PDOs
+[type 'CANOpenMPDO'
+ [simple uint 8 'node']
+ [simple IndexAddress 'address']
+ [array int 8 'data' COUNT '4']
+]
diff --git a/sandbox/test-java-can-driver/pom.xml b/sandbox/test-java-can-driver/pom.xml
index e3e72c4..4ed0eaa 100644
--- a/sandbox/test-java-can-driver/pom.xml
+++ b/sandbox/test-java-can-driver/pom.xml
@@ -145,6 +145,10 @@
<artifactId>jackson-dataformat-xml</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
index 6df8b0c..be4675b 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
@@ -9,9 +9,11 @@ import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTr
public interface CANConversation<W extends CANFrame> {
+ int getNodeId();
+
CANFrameBuilder<W> frameBuilder();
- void send(W frame, BiConsumer<RequestTransaction, SendRequestContext<W>> callback);
+ void send(RequestTransaction transaction, W frame, BiConsumer<RequestTransaction, SendRequestContext<W>> callback);
}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java
index abe6aa0..fee418d 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java
@@ -9,16 +9,23 @@ import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext;
import org.apache.plc4x.java.spi.generation.ParseException;
import org.apache.plc4x.java.spi.generation.ReadBuffer;
import org.apache.plc4x.java.spi.generation.WriteBuffer;
+import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
public class CANOpenConversation<W extends CANFrame> {
+ private final Logger logger = LoggerFactory.getLogger(CANOpenConversation.class);
+ private final RequestTransaction transaction;
private final int node;
private final CANConversation<W> delegate;
- public CANOpenConversation(int node, CANConversation<W> delegate) {
+ public CANOpenConversation(RequestTransaction transaction, int node, CANConversation<W> delegate) {
+ this.transaction = transaction;
this.node = node;
this.delegate = delegate;
}
@@ -30,7 +37,8 @@ public class CANOpenConversation<W extends CANFrame> {
public void send(CANOpenService service, CANOpenPayload payload, BiConsumer<RequestTransaction, SendRequestContext<CANOpenPayload>> callback) {
CANFrameBuilder<W> builder = delegate.frameBuilder();
W frame = builder.node(service.getMin() + node).data(serialize(payload)).build();
- delegate.send(frame, (tx, ctx) -> {
+ logger.info("Request data under transaction {}", transaction);
+ delegate.send(transaction, frame, (tx, ctx) -> {
SendRequestContext<CANOpenPayload> unwrap = ctx
// .onError((response, error) -> {
// System.err.println("Unexpected frame " + response + " " + error);
@@ -38,6 +46,8 @@ public class CANOpenConversation<W extends CANFrame> {
.unwrap(CANOpenConversation.this::deserialize);
callback.accept(tx, unwrap);
});
+
+
}
private CANOpenPayload deserialize(CANFrame frame) {
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
index cb2d778..0bb0998 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
@@ -7,11 +7,16 @@ import org.apache.plc4x.java.can.api.segmentation.accumulator.ByteStorage;
import org.apache.plc4x.java.canopen.readwrite.*;
import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
import org.apache.plc4x.java.spi.generation.ParseException;
+import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
public class SDOUploadConversation<W extends CANFrame> extends CANOpenConversationBase {
+ private final Logger logger = LoggerFactory.getLogger(SDOUploadConversation.class);
private final SDOConversation<W> delegate;
private final IndexAddress address;
private final CANOpenDataType type;
@@ -22,39 +27,56 @@ public class SDOUploadConversation<W extends CANFrame> extends CANOpenConversati
this.type = type;
}
- public void execute(BiConsumer<PlcValue, Throwable> receiver) throws PlcException {
+ public void execute(CompletableFuture<PlcValue> receiver) throws PlcException {
SDOInitiateUploadRequest rq = new SDOInitiateUploadRequest(address);
delegate.send(rq, (tx, ctx) ->
- ctx
-// .onError((response, error) -> {
-// System.err.println("Unexpected frame " + response + " " + error);
-// receiver.accept(null, error);
-// })
- .unwrap(CANOpenSDOResponse::getResponse)
- .onError(((response, error) -> {
- if (response instanceof SDOAbortResponse) {
- SDOAbortResponse abort = (SDOAbortResponse) response;
+ ctx.onError((response, error) -> {
+ System.err.println("Unexpected frame " + response + " " + error);
+ if (error != null) {
+ receiver.completeExceptionally(error);
+ return;
+ }
+ if (response.getResponse() instanceof SDOAbortResponse) {
+ SDOAbortResponse abort = (SDOAbortResponse) response.getResponse();
SDOAbort sdoAbort = abort.getAbort();
- receiver.accept(null, new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode()));
- } else {
- receiver.accept(null, error);
+ receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode()));
}
- }))
+ })
+ .check(reply -> {
+ logger.warn("Received answer {}", reply);
+ return true;
+ })
+ .unwrap(CANOpenSDOResponse::getResponse).check(reply -> {
+ logger.warn("Received answer {}", reply);
+ return true;
+ })
+ .check(reply -> {
+ logger.warn("Received answer {}", reply);
+ return true;
+ })
.only(SDOInitiateUploadResponse.class)
- .check(r -> r.getAddress().equals(address))
+ .check(resp -> {
+ logger.warn("Checking if reply address {}/{} matches {}/{}: {}",
+ Integer.toHexString(resp.getAddress().getIndex()), Integer.toHexString(resp.getAddress().getSubindex()),
+ Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()),
+ resp.getAddress().equals(address)
+ );
+ return resp.getAddress().equals(address);
+ })
.handle(response -> {
- handle(receiver, response);
+ handle(tx, receiver, response);
})
);
}
- private void handle(BiConsumer<PlcValue, Throwable> receiver, SDOInitiateUploadResponse answer) {
+ private void handle(RequestTransactionManager.RequestTransaction tx, CompletableFuture<PlcValue> receiver, SDOInitiateUploadResponse answer) {
BiConsumer<Integer, byte[]> valueCallback = (length, bytes) -> {
try {
- receiver.accept(decodeFrom(bytes, type, length), null);
- } catch (ParseException e) {
- receiver.accept(null, e);
+ final PlcValue decodedValue = decodeFrom(bytes, type, length);
+ receiver.complete(decodedValue);
+ } catch (ArrayIndexOutOfBoundsException | ParseException e) {
+ receiver.completeExceptionally(e);
}
};
@@ -62,32 +84,51 @@ public class SDOUploadConversation<W extends CANFrame> extends CANOpenConversati
SDOInitiateExpeditedUploadResponse payload = (SDOInitiateExpeditedUploadResponse) answer.getPayload();
valueCallback.accept(payload.getData().length, payload.getData());
} else if (answer.getPayload() instanceof SDOInitiateSegmentedUploadResponse) {
+ logger.info("Beginning of segmented operation for address {}/{}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()));
ByteStorage.SDOUploadStorage storage = new ByteStorage.SDOUploadStorage();
storage.append(answer);
SDOInitiateSegmentedUploadResponse segment = (SDOInitiateSegmentedUploadResponse) answer.getPayload();
fetch(storage, valueCallback, receiver, false, Long.valueOf(segment.getBytes()).intValue());
} else {
- receiver.accept(null, new PlcException("Unsupported SDO operation kind."));
+ receiver.completeExceptionally(new PlcException("Unsupported SDO operation kind."));
}
}
- private void fetch(ByteStorage.SDOUploadStorage storage, BiConsumer<Integer, byte[]> valueCallback, BiConsumer<PlcValue, Throwable> receiver, boolean toggle, int size) {
+ private void fetch(ByteStorage.SDOUploadStorage storage, BiConsumer<Integer, byte[]> valueCallback, CompletableFuture<PlcValue> receiver, boolean toggle, int size) {
+ logger.info("Request next data block for address {}/{}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()));
delegate.send(new SDOSegmentUploadRequest(toggle), (tx, ctx) -> {
ctx.unwrap(CANOpenSDOResponse::getResponse)
- .only(SDOSegmentUploadResponse.class)
.onError((response, error) -> {
System.out.println("Unexpected frame " + response + " " + error);
- receiver.accept(null, error);
+ if (error != null) {
+ receiver.completeExceptionally(error);
+ return;
+ }
+
+ if (response instanceof SDOAbortResponse) {
+ SDOAbortResponse abort = (SDOAbortResponse) response;
+ SDOAbort sdoAbort = abort.getAbort();
+ receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode()));
+ }
})
+ .only(SDOSegmentUploadResponse.class)
.check(r -> r.getToggle() == toggle)
- .handle(reply -> {
- storage.append(reply);
+ .handle(response -> {
+// if (!reply.getToggle() == toggle) { // toggle flag is wrong, abort transaction
+// logger.info("Received invalid answer from party for {}", address);
+// delegate.send(new SDOAbortRequest(new SDOAbort(address, 0x100)), (tx2, ctx2) -> {});
+// return;
+// }
+
+ storage.append(response);
- if (reply.getLast()) {
+ if (response.getLast()) {
// validate size
+ logger.trace("Completed reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
valueCallback.accept(Long.valueOf(size).intValue(), storage.get());
} else {
+ logger.trace("Continue reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
fetch(storage, valueCallback, receiver, !toggle, size);
}
});
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
index fcb7194..af4188a 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
@@ -28,7 +28,7 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio
private int nodeId;
@ConfigurationParameter
- private boolean hearbeat;
+ private boolean heartbeat;
public int getNodeId() {
return nodeId;
@@ -39,11 +39,11 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio
}
public boolean isHeartbeat() {
- return hearbeat;
+ return heartbeat;
}
public void setHeartbeat(boolean heartbeat) {
- this.hearbeat = heartbeat;
+ this.heartbeat = heartbeat;
}
}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java
index f65e5b3..2761260 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java
@@ -20,17 +20,21 @@ package org.apache.plc4x.java.can.field;
import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
+import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
+import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class CANOpenPDOField extends CANOpenField {
- public static final Pattern ADDRESS_PATTERN = Pattern.compile("PDO:" + CANOpenField.NODE_PATTERN + ":(?<canDataType>\\w+)(\\[(?<numberOfElements>\\d)])?");
+ public static final Pattern ADDRESS_PATTERN = Pattern.compile("(?<pdo>(?:RECEIVE|TRANSMIT)_PDO_[1-4]):" + CANOpenField.NODE_PATTERN + ":(?<canDataType>\\w+)(\\[(?<numberOfElements>\\d)])?");
+ private final CANOpenService service;
private final CANOpenDataType canOpenDataType;
- public CANOpenPDOField(int node, CANOpenDataType canOpenDataType) {
+ public CANOpenPDOField(int node, CANOpenService service, CANOpenDataType canOpenDataType) {
super(node);
+ this.service = service;
this.canOpenDataType = canOpenDataType;
}
@@ -55,10 +59,19 @@ public class CANOpenPDOField extends CANOpenField {
Matcher matcher = getMatcher(addressString);
int nodeId = Integer.parseInt(matcher.group("nodeId"));
+ String pdo = matcher.group("pdo");
+ CANOpenService service = CANOpenService.valueOf(pdo);
+ if (service == null) {
+ throw new IllegalArgumentException("Invalid PDO detected " + pdo);
+ }
+
String canDataTypeString = matcher.group("canDataType");
CANOpenDataType canOpenDataType = CANOpenDataType.valueOf(canDataTypeString);
- return new CANOpenPDOField(nodeId, canOpenDataType);
+ return new CANOpenPDOField(nodeId, service, canOpenDataType);
}
+ public CANOpenService getService() {
+ return service;
+ }
}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
index d96fe19..e384d0a 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
@@ -18,6 +18,8 @@ under the License.
*/
package org.apache.plc4x.java.can.protocol;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcField;
@@ -38,7 +40,10 @@ import org.apache.plc4x.java.can.field.CANOpenField;
import org.apache.plc4x.java.can.field.CANOpenPDOField;
import org.apache.plc4x.java.can.field.CANOpenSDOField;
import org.apache.plc4x.java.can.socketcan.SocketCANConversation;
-import org.apache.plc4x.java.canopen.readwrite.*;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenHeartbeatPayload;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenPDOPayload;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenPayload;
+import org.apache.plc4x.java.canopen.readwrite.IndexAddress;
import org.apache.plc4x.java.canopen.readwrite.io.CANOpenHeartbeatPayloadIO;
import org.apache.plc4x.java.canopen.readwrite.io.CANOpenPayloadIO;
import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO;
@@ -61,6 +66,8 @@ import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
@@ -91,8 +98,6 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
@Override
public void setConfiguration(CANConfiguration configuration) {
this.configuration = configuration;
- // Set the transaction manager to allow only one message at a time.
- this.tm = new RequestTransactionManager(1);
}
@Override
@@ -135,7 +140,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
@Override
public void setContext(ConversationContext<SocketCANFrame> context) {
super.setContext(context);
- this.conversation = new SocketCANConversation(tm, context);
+ this.conversation = new SocketCANConversation(configuration.getNodeId(), context);
}
private SocketCANFrame createFrame(CANOpenHeartbeatPayload state) throws ParseException {
@@ -171,7 +176,8 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
}
private void writeInternally(InternalPlcWriteRequest writeRequest, CANOpenSDOField field, CompletableFuture<PlcWriteResponse> response) {
- CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(field.getNodeId(), conversation);
+ final RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+ CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(transaction, field.getNodeId(), conversation);
PlcValue writeValue = writeRequest.getPlcValues().get(0);
@@ -180,6 +186,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
download.execute((value, error) -> {
String fieldName = writeRequest.getFieldNames().iterator().next();
response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.OK)));
+ transaction.endRequest();
});
} catch (Exception e) {
response.completeExceptionally(e);
@@ -191,10 +198,10 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
try {
String fieldName = writeRequest.getFieldNames().iterator().next();
- //
- WriteBuffer buffer = DataItemIO.staticSerialize(writeValue, field.getCanOpenDataType(), writeValue.getLength() / 8, true);
+ WriteBuffer buffer = DataItemIO.staticSerialize(writeValue, field.getCanOpenDataType(), writeValue.getLength(), true);
if (buffer != null) {
- context.sendToWire(new SocketCANFrame(field.getNodeId(), buffer.getData()));
+ int cob = field.getService().getMin() + field.getNodeId();
+ context.sendToWire(new SocketCANFrame(cob, buffer.getData()));
response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.OK)));
} else {
response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.INVALID_DATA)));
@@ -252,16 +259,27 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
}
private void readInternally(InternalPlcReadRequest readRequest, CANOpenSDOField field, CompletableFuture<PlcReadResponse> response) {
- CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(field.getNodeId(), conversation);
-
- SDOUploadConversation<CANFrame> upload = canopen.sdo().upload(new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType());
try {
- upload.execute((value, error) -> {
+ final RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+ CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(transaction, field.getNodeId(), conversation);
+ System.out.println("----> Submit read " + field.getIndex() + "/" + field.getSubIndex() + " from " + field.getNodeId() + " " + transaction);
+ SDOUploadConversation<CANFrame> upload = canopen.sdo().upload(new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType());
+ CompletableFuture<PlcValue> callback = new CompletableFuture<>();
+ callback.whenComplete((value, error) -> {
+ System.out.println("<---- Received reply " + field.getIndex() + "/" + field.getSubIndex() + " from " + field.getNodeId() + " " + value + "/" + error + " " + transaction);
+ if (error != null) {
+ response.completeExceptionally(error);
+ transaction.endRequest();
+ return;
+ }
+
String fieldName = readRequest.getFieldNames().iterator().next();
Map<String, ResponseItem<PlcValue>> fields = new HashMap<>();
fields.put(fieldName, new ResponseItem<>(PlcResponseCode.OK, value));
response.complete(new DefaultPlcReadResponse(readRequest, fields));
+ transaction.endRequest();
});
+ upload.execute(callback);
} catch (Exception e) {
response.completeExceptionally(e);
}
@@ -275,13 +293,23 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
CANOpenDriverContext.CALLBACK.receive(msg);
if (service != null) {
- logger.info("Decoded CANOpen {} from {}, message {}", service, Math.abs(service.getMin() - msg.getIdentifier()), payload);
+ int nodeId = Math.abs(service.getMin() - msg.getIdentifier());
if (service.getPdo() && payload instanceof CANOpenPDOPayload) {
- logger.info("Broadcasting PDO to subscribers");
- publishEvent(msg.getIdentifier(), (CANOpenPDOPayload) payload);
+ publishEvent(service, nodeId, (CANOpenPDOPayload) payload);
+ } else {
+ String hex = "";
+ if (logger.isInfoEnabled()) {
+ try {
+ final WriteBuffer buffer = new WriteBuffer(payload.getLengthInBytes(), true);
+ CANOpenPayloadIO.staticSerialize(buffer, payload);
+ hex = Hex.encodeHexString(buffer.getData());
+ } catch (ParseException e) {
+ e.printStackTrace();
+ }
+ }
+ logger.info("Decoded CANOpen {} from {}, message {}, {}", service, nodeId, payload, hex);
}
-
} else {
logger.info("CAN message {}, {}", msg.getIdentifier(), msg);
}
@@ -296,7 +324,8 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
// }
}
- private void publishEvent(int nodeId, CANOpenPDOPayload payload) {
+ private void publishEvent(CANOpenService service, int nodeId, CANOpenPDOPayload payload) {
+ CANOpenSubscriptionHandle dispatchedHandle = null;
for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) {
DefaultPlcConsumerRegistration registration = entry.getKey();
Consumer<PlcSubscriptionEvent> consumer = entry.getValue();
@@ -305,7 +334,10 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
if (handler instanceof CANOpenSubscriptionHandle) {
CANOpenSubscriptionHandle handle = (CANOpenSubscriptionHandle) handler;
- if (handle.matches(nodeId)) {
+ if (handle.matches(service, nodeId)) {
+ logger.trace("Dispatching notification {} for node {} to {}", service, nodeId, handle);
+ dispatchedHandle = handle;
+
CANOpenPDOField field = handle.getField();
byte[] data = payload.getPdo().getData();
try {
@@ -329,10 +361,15 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
);
consumer.accept(event);
}
+ } else {
}
}
}
}
+
+ if (dispatchedHandle == null) {
+ logger.trace("Could not find subscription matching {} and node {}", service, nodeId);
+ }
}
@Override
@@ -362,8 +399,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
private int cobId(int nodeId, CANOpenService service) {
// form 32 bit socketcan identifier
- return (nodeId << 24) & 0xff000000 |
- (service.getValue() << 16 ) & 0x00ff0000;
+ return service.getMin() + nodeId;
}
private CANOpenService serviceId(int cobId) {
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java
index 07ecb3b..9bfd688 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java
@@ -1,6 +1,7 @@
package org.apache.plc4x.java.can.protocol;
import org.apache.plc4x.java.can.field.CANOpenPDOField;
+import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
import org.apache.plc4x.java.spi.messages.PlcSubscriber;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
@@ -14,8 +15,11 @@ public class CANOpenSubscriptionHandle extends DefaultPlcSubscriptionHandle {
this.field = field;
}
- public boolean matches(int identifier) {
- return field.getNodeId() == 0 || field.getNodeId() == identifier;
+ public boolean matches(CANOpenService service, int identifier) {
+ if (field.getService() != service) {
+ return false;
+ }
+ return field.getNodeId() == identifier;
}
public String getName() {
@@ -25,4 +29,13 @@ public class CANOpenSubscriptionHandle extends DefaultPlcSubscriptionHandle {
public CANOpenPDOField getField() {
return field;
}
+
+ public String toString() {
+ return "CANOpenSubscriptionHandle [service=" + field.getService() + ", node=" + intAndHex(field.getNodeId()) + ", cob=" + intAndHex(field.getService().getMin() + field.getNodeId()) + "]";
+ }
+
+ private static String intAndHex(int val) {
+ return val + "(0x" + Integer.toHexString(val) + ")";
+ }
+
}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
index 8b01c2a..c35843d 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
@@ -7,7 +7,6 @@ import org.apache.plc4x.java.can.api.conversation.canopen.CANFrameBuilder;
import org.apache.plc4x.java.socketcan.readwrite.SocketCANFrame;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext;
-import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction;
import java.time.Duration;
@@ -15,32 +14,35 @@ import java.util.function.BiConsumer;
public class SocketCANConversation implements CANConversation<CANFrame> {
- private final RequestTransactionManager tm;
+ private final int nodeId;
private final ConversationContext<SocketCANFrame> context;
- public SocketCANConversation(RequestTransactionManager tm, ConversationContext<SocketCANFrame> context) {
- this.tm = tm;
+ public SocketCANConversation(int nodeId, ConversationContext<SocketCANFrame> context) {
+ this.nodeId = nodeId;
this.context = context;
}
@Override
+ public int getNodeId() {
+ return nodeId;
+ }
+
+ @Override
public CANFrameBuilder<CANFrame> frameBuilder() {
return new SocketCANFrameBuilder();
}
@Override
- public void send(CANFrame frame, BiConsumer<RequestTransaction, SendRequestContext<CANFrame>> callback) {
+ public void send(RequestTransaction transaction, CANFrame frame, BiConsumer<RequestTransaction, SendRequestContext<CANFrame>> callback) {
if (frame instanceof SocketCANDelegateFrame) {
- RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
-
- ConversationContext.SendRequestContext<CANFrame> ctx = context.sendRequest(((SocketCANDelegateFrame) frame).getFrame())
- .expectResponse(SocketCANFrame.class, Duration.ofSeconds(10L))
-// .onError((response, error) -> {
-// System.err.println("Unexpected frame " + response + " " + error);
-// })
- .unwrap(SocketCANDelegateFrame::new);
- //return CompletableFuture.completedFuture(new SocketCANTransactionContext<>(transaction, ctx));
- callback.accept(transaction, ctx);
+ System.out.println("-----> Sending request frame " + transaction);
+ transaction.submit(() -> {
+ ConversationContext.SendRequestContext<CANFrame> ctx = context.sendRequest(((SocketCANDelegateFrame) frame).getFrame())
+ .expectResponse(SocketCANFrame.class, Duration.ofSeconds(10L))
+ .unwrap(SocketCANDelegateFrame::new);
+ System.out.println("-----> Frame been sent " + transaction);
+ callback.accept(transaction, ctx);
+ });
return;
}
throw new PlcRuntimeException("Unsupported frame type " + frame);