You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by ld...@apache.org on 2020/10/09 09:49:13 UTC

[plc4x] branch feature/socketcan updated: Working on SDO upload (read) operation coordination.

This is an automated email from the ASF dual-hosted git repository.

ldywicki pushed a commit to branch feature/socketcan
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/feature/socketcan by this push:
     new 2b3f594  Working on SDO upload (read) operation coordination.
2b3f594 is described below

commit 2b3f594afaabcf1b0336ea337fb501841fe997f1
Author: Ɓukasz Dywicki <lu...@code-house.org>
AuthorDate: Fri Oct 9 11:46:04 2020 +0200

    Working on SDO upload (read) operation coordination.
---
 .../spi/transaction/RequestTransactionManager.java | 23 +++++-
 .../src/main/resources/protocols/can/canopen.mspec |  9 +-
 sandbox/test-java-can-driver/pom.xml               |  4 +
 .../api/conversation/canopen/CANConversation.java  |  4 +-
 .../conversation/canopen/CANOpenConversation.java  | 14 +++-
 .../canopen/SDOUploadConversation.java             | 95 ++++++++++++++++------
 .../java/can/configuration/CANConfiguration.java   |  6 +-
 .../plc4x/java/can/field/CANOpenPDOField.java      | 19 ++++-
 .../java/can/protocol/CANOpenProtocolLogic.java    | 76 ++++++++++++-----
 .../can/protocol/CANOpenSubscriptionHandle.java    | 17 +++-
 .../java/can/socketcan/SocketCANConversation.java  | 32 ++++----
 11 files changed, 224 insertions(+), 75 deletions(-)

diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
index e49953c..5fb2ed5 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java
@@ -21,6 +21,7 @@ package org.apache.plc4x.java.spi.transaction;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import java.util.Objects;
 import java.util.Queue;
@@ -173,7 +174,8 @@ public class RequestTransactionManager {
         }
 
         public void submit(Runnable operation) {
-            this.setOperation(operation);
+            logger.trace("Submission of transaction {}", transactionId);
+            this.setOperation(new TransactionOperation(transactionId, operation));
             this.parent.submit(this);
         }
 
@@ -189,6 +191,25 @@ public class RequestTransactionManager {
         public int hashCode() {
             return Objects.hash(transactionId);
         }
+
     }
 
+    static class TransactionOperation implements Runnable {
+        private final int transactionId;
+        private final Runnable delegate;
+
+        public TransactionOperation(int transactionId, Runnable delegate) {
+            this.transactionId = transactionId;
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void run() {
+            try (final MDC.MDCCloseable closeable = MDC.putCloseable("plc4x.transactionId", Integer.toString(transactionId))) {
+                logger.trace("Start execution of transaction {}", transactionId);
+                delegate.run();
+                logger.trace("Completed execution of transaction {}", transactionId);
+            }
+        }
+    }
 }
diff --git a/protocols/can/src/main/resources/protocols/can/canopen.mspec b/protocols/can/src/main/resources/protocols/can/canopen.mspec
index 10afdf9..7886b88 100644
--- a/protocols/can/src/main/resources/protocols/can/canopen.mspec
+++ b/protocols/can/src/main/resources/protocols/can/canopen.mspec
@@ -353,4 +353,11 @@
             ]
         ]
     ]
-]
\ No newline at end of file
+]
+
+// utility type quickly write data for mapped/manufacturer PDOs
+[type 'CANOpenMPDO'
+    [simple uint 8 'node']
+    [simple IndexAddress 'address']
+    [array int 8 'data' COUNT '4']
+]
diff --git a/sandbox/test-java-can-driver/pom.xml b/sandbox/test-java-can-driver/pom.xml
index e3e72c4..4ed0eaa 100644
--- a/sandbox/test-java-can-driver/pom.xml
+++ b/sandbox/test-java-can-driver/pom.xml
@@ -145,6 +145,10 @@
       <artifactId>jackson-dataformat-xml</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
index 6df8b0c..be4675b 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
@@ -9,9 +9,11 @@ import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTr
 
 public interface CANConversation<W extends CANFrame> {
 
+    int getNodeId();
+
     CANFrameBuilder<W> frameBuilder();
 
-    void send(W frame, BiConsumer<RequestTransaction, SendRequestContext<W>> callback);
+    void send(RequestTransaction transaction, W frame, BiConsumer<RequestTransaction, SendRequestContext<W>> callback);
 
 
 }
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java
index abe6aa0..fee418d 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java
@@ -9,16 +9,23 @@ import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext;
 import org.apache.plc4x.java.spi.generation.ParseException;
 import org.apache.plc4x.java.spi.generation.ReadBuffer;
 import org.apache.plc4x.java.spi.generation.WriteBuffer;
+import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
 import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 
 public class CANOpenConversation<W extends CANFrame> {
 
+    private final Logger logger = LoggerFactory.getLogger(CANOpenConversation.class);
+    private final RequestTransaction transaction;
     private final int node;
     private final CANConversation<W> delegate;
 
-    public CANOpenConversation(int node, CANConversation<W> delegate) {
+    public CANOpenConversation(RequestTransaction transaction, int node, CANConversation<W> delegate) {
+        this.transaction = transaction;
         this.node = node;
         this.delegate = delegate;
     }
@@ -30,7 +37,8 @@ public class CANOpenConversation<W extends CANFrame> {
     public void send(CANOpenService service, CANOpenPayload payload, BiConsumer<RequestTransaction, SendRequestContext<CANOpenPayload>> callback) {
         CANFrameBuilder<W> builder = delegate.frameBuilder();
         W frame = builder.node(service.getMin() + node).data(serialize(payload)).build();
-        delegate.send(frame, (tx, ctx) -> {
+        logger.info("Request data under transaction {}", transaction);
+        delegate.send(transaction, frame, (tx, ctx) -> {
             SendRequestContext<CANOpenPayload> unwrap = ctx
 //                .onError((response, error) -> {
 //                    System.err.println("Unexpected frame " + response + " " + error);
@@ -38,6 +46,8 @@ public class CANOpenConversation<W extends CANFrame> {
             .unwrap(CANOpenConversation.this::deserialize);
             callback.accept(tx, unwrap);
         });
+
+
     }
 
     private CANOpenPayload deserialize(CANFrame frame) {
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
index cb2d778..0bb0998 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
@@ -7,11 +7,16 @@ import org.apache.plc4x.java.can.api.segmentation.accumulator.ByteStorage;
 import org.apache.plc4x.java.canopen.readwrite.*;
 import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
 import org.apache.plc4x.java.spi.generation.ParseException;
+import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 
 public class SDOUploadConversation<W extends CANFrame> extends CANOpenConversationBase {
 
+    private final Logger logger = LoggerFactory.getLogger(SDOUploadConversation.class);
     private final SDOConversation<W> delegate;
     private final IndexAddress address;
     private final CANOpenDataType type;
@@ -22,39 +27,56 @@ public class SDOUploadConversation<W extends CANFrame> extends CANOpenConversati
         this.type = type;
     }
 
-    public void execute(BiConsumer<PlcValue, Throwable> receiver) throws PlcException {
+    public void execute(CompletableFuture<PlcValue> receiver) throws PlcException {
         SDOInitiateUploadRequest rq = new SDOInitiateUploadRequest(address);
 
         delegate.send(rq, (tx, ctx) ->
-            ctx
-//            .onError((response, error) -> {
-//                System.err.println("Unexpected frame " + response + " " + error);
-//                receiver.accept(null, error);
-//            })
-            .unwrap(CANOpenSDOResponse::getResponse)
-            .onError(((response, error) -> {
-                if (response instanceof SDOAbortResponse) {
-                    SDOAbortResponse abort = (SDOAbortResponse) response;
+            ctx.onError((response, error) -> {
+                System.err.println("Unexpected frame " + response + " " + error);
+                if (error != null) {
+                    receiver.completeExceptionally(error);
+                    return;
+                }
+                if (response.getResponse() instanceof SDOAbortResponse) {
+                    SDOAbortResponse abort = (SDOAbortResponse) response.getResponse();
                     SDOAbort sdoAbort = abort.getAbort();
-                    receiver.accept(null, new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode()));
-                } else {
-                    receiver.accept(null, error);
+                    receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode()));
                 }
-            }))
+            })
+            .check(reply -> {
+                logger.warn("Received answer {}", reply);
+                return true;
+            })
+            .unwrap(CANOpenSDOResponse::getResponse).check(reply -> {
+                logger.warn("Received answer {}", reply);
+                return true;
+            })
+            .check(reply -> {
+                logger.warn("Received answer {}", reply);
+                return true;
+            })
             .only(SDOInitiateUploadResponse.class)
-            .check(r -> r.getAddress().equals(address))
+            .check(resp -> {
+                logger.warn("Checking if reply address {}/{} matches {}/{}: {}",
+                    Integer.toHexString(resp.getAddress().getIndex()), Integer.toHexString(resp.getAddress().getSubindex()),
+                    Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()),
+                    resp.getAddress().equals(address)
+                );
+                return resp.getAddress().equals(address);
+            })
             .handle(response -> {
-                handle(receiver, response);
+                handle(tx, receiver, response);
             })
         );
     }
 
-    private void handle(BiConsumer<PlcValue, Throwable> receiver, SDOInitiateUploadResponse answer) {
+    private void handle(RequestTransactionManager.RequestTransaction tx, CompletableFuture<PlcValue> receiver, SDOInitiateUploadResponse answer) {
         BiConsumer<Integer, byte[]> valueCallback = (length, bytes) -> {
             try {
-                receiver.accept(decodeFrom(bytes, type, length), null);
-            } catch (ParseException e) {
-                receiver.accept(null, e);
+                final PlcValue decodedValue = decodeFrom(bytes, type, length);
+                receiver.complete(decodedValue);
+            } catch (ArrayIndexOutOfBoundsException | ParseException e) {
+                receiver.completeExceptionally(e);
             }
         };
 
@@ -62,32 +84,51 @@ public class SDOUploadConversation<W extends CANFrame> extends CANOpenConversati
             SDOInitiateExpeditedUploadResponse payload = (SDOInitiateExpeditedUploadResponse) answer.getPayload();
             valueCallback.accept(payload.getData().length, payload.getData());
         } else if (answer.getPayload() instanceof SDOInitiateSegmentedUploadResponse) {
+            logger.info("Beginning of segmented operation for address {}/{}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()));
             ByteStorage.SDOUploadStorage storage = new ByteStorage.SDOUploadStorage();
             storage.append(answer);
 
             SDOInitiateSegmentedUploadResponse segment = (SDOInitiateSegmentedUploadResponse) answer.getPayload();
             fetch(storage, valueCallback, receiver, false, Long.valueOf(segment.getBytes()).intValue());
         } else {
-            receiver.accept(null, new PlcException("Unsupported SDO operation kind."));
+            receiver.completeExceptionally(new PlcException("Unsupported SDO operation kind."));
         }
     }
 
-    private void fetch(ByteStorage.SDOUploadStorage storage, BiConsumer<Integer, byte[]> valueCallback, BiConsumer<PlcValue, Throwable> receiver, boolean toggle, int size) {
+    private void fetch(ByteStorage.SDOUploadStorage storage, BiConsumer<Integer, byte[]> valueCallback, CompletableFuture<PlcValue> receiver, boolean toggle, int size) {
+        logger.info("Request next data block for address {}/{}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()));
         delegate.send(new SDOSegmentUploadRequest(toggle), (tx, ctx) -> {
             ctx.unwrap(CANOpenSDOResponse::getResponse)
-                .only(SDOSegmentUploadResponse.class)
                 .onError((response, error) -> {
                     System.out.println("Unexpected frame " + response + " " + error);
-                    receiver.accept(null, error);
+                    if (error != null) {
+                        receiver.completeExceptionally(error);
+                        return;
+                    }
+
+                    if (response instanceof SDOAbortResponse) {
+                        SDOAbortResponse abort = (SDOAbortResponse) response;
+                        SDOAbort sdoAbort = abort.getAbort();
+                        receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode()));
+                    }
                 })
+                .only(SDOSegmentUploadResponse.class)
                 .check(r -> r.getToggle() == toggle)
-                .handle(reply -> {
-                    storage.append(reply);
+                .handle(response -> {
+//                    if (!reply.getToggle() == toggle) { // toggle flag is wrong, abort transaction
+//                        logger.info("Received invalid answer from party for {}", address);
+//                        delegate.send(new SDOAbortRequest(new SDOAbort(address, 0x100)), (tx2, ctx2) -> {});
+//                        return;
+//                    }
+
+                    storage.append(response);
 
-                    if (reply.getLast()) {
+                    if (response.getLast()) {
                         // validate size
+                        logger.trace("Completed reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
                         valueCallback.accept(Long.valueOf(size).intValue(), storage.get());
                     } else {
+                        logger.trace("Continue reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
                         fetch(storage, valueCallback, receiver, !toggle, size);
                     }
                 });
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
index fcb7194..af4188a 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
@@ -28,7 +28,7 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio
     private int nodeId;
 
     @ConfigurationParameter
-    private boolean hearbeat;
+    private boolean heartbeat;
 
     public int getNodeId() {
         return nodeId;
@@ -39,11 +39,11 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio
     }
 
     public boolean isHeartbeat() {
-        return hearbeat;
+        return heartbeat;
     }
 
     public void setHeartbeat(boolean heartbeat) {
-        this.hearbeat = heartbeat;
+        this.heartbeat = heartbeat;
     }
 
 }
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java
index f65e5b3..2761260 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java
@@ -20,17 +20,21 @@ package org.apache.plc4x.java.can.field;
 
 import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
 import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
+import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
 
+import java.util.Optional;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public class CANOpenPDOField extends CANOpenField {
 
-    public static final Pattern ADDRESS_PATTERN = Pattern.compile("PDO:" + CANOpenField.NODE_PATTERN + ":(?<canDataType>\\w+)(\\[(?<numberOfElements>\\d)])?");
+    public static final Pattern ADDRESS_PATTERN = Pattern.compile("(?<pdo>(?:RECEIVE|TRANSMIT)_PDO_[1-4]):" + CANOpenField.NODE_PATTERN + ":(?<canDataType>\\w+)(\\[(?<numberOfElements>\\d)])?");
+    private final CANOpenService service;
     private final CANOpenDataType canOpenDataType;
 
-    public CANOpenPDOField(int node, CANOpenDataType canOpenDataType) {
+    public CANOpenPDOField(int node, CANOpenService service, CANOpenDataType canOpenDataType) {
         super(node);
+        this.service = service;
         this.canOpenDataType = canOpenDataType;
     }
 
@@ -55,10 +59,19 @@ public class CANOpenPDOField extends CANOpenField {
         Matcher matcher = getMatcher(addressString);
         int nodeId = Integer.parseInt(matcher.group("nodeId"));
 
+        String pdo = matcher.group("pdo");
+        CANOpenService service = CANOpenService.valueOf(pdo);
+        if (service == null) {
+            throw new IllegalArgumentException("Invalid PDO detected " + pdo);
+        }
+
         String canDataTypeString = matcher.group("canDataType");
         CANOpenDataType canOpenDataType = CANOpenDataType.valueOf(canDataTypeString);
 
-        return new CANOpenPDOField(nodeId, canOpenDataType);
+        return new CANOpenPDOField(nodeId, service, canOpenDataType);
     }
 
+    public CANOpenService getService() {
+        return service;
+    }
 }
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
index d96fe19..e384d0a 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
@@ -18,6 +18,8 @@ under the License.
 */
 package org.apache.plc4x.java.can.protocol;
 
+import org.apache.commons.codec.binary.Hex;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
 import org.apache.plc4x.java.api.model.PlcField;
@@ -38,7 +40,10 @@ import org.apache.plc4x.java.can.field.CANOpenField;
 import org.apache.plc4x.java.can.field.CANOpenPDOField;
 import org.apache.plc4x.java.can.field.CANOpenSDOField;
 import org.apache.plc4x.java.can.socketcan.SocketCANConversation;
-import org.apache.plc4x.java.canopen.readwrite.*;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenHeartbeatPayload;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenPDOPayload;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenPayload;
+import org.apache.plc4x.java.canopen.readwrite.IndexAddress;
 import org.apache.plc4x.java.canopen.readwrite.io.CANOpenHeartbeatPayloadIO;
 import org.apache.plc4x.java.canopen.readwrite.io.CANOpenPayloadIO;
 import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO;
@@ -61,6 +66,8 @@ import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
@@ -91,8 +98,6 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
     @Override
     public void setConfiguration(CANConfiguration configuration) {
         this.configuration = configuration;
-        // Set the transaction manager to allow only one message at a time.
-        this.tm = new RequestTransactionManager(1);
     }
 
     @Override
@@ -135,7 +140,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
     @Override
     public void setContext(ConversationContext<SocketCANFrame> context) {
         super.setContext(context);
-        this.conversation = new SocketCANConversation(tm, context);
+        this.conversation = new SocketCANConversation(configuration.getNodeId(), context);
     }
 
     private SocketCANFrame createFrame(CANOpenHeartbeatPayload state) throws ParseException {
@@ -171,7 +176,8 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
     }
 
     private void writeInternally(InternalPlcWriteRequest writeRequest, CANOpenSDOField field, CompletableFuture<PlcWriteResponse> response) {
-        CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(field.getNodeId(), conversation);
+        final RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+        CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(transaction, field.getNodeId(), conversation);
 
         PlcValue writeValue = writeRequest.getPlcValues().get(0);
 
@@ -180,6 +186,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
             download.execute((value, error) -> {
                 String fieldName = writeRequest.getFieldNames().iterator().next();
                 response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.OK)));
+                transaction.endRequest();
             });
         } catch (Exception e) {
             response.completeExceptionally(e);
@@ -191,10 +198,10 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
 
         try {
             String fieldName = writeRequest.getFieldNames().iterator().next();
-            //
-            WriteBuffer buffer = DataItemIO.staticSerialize(writeValue, field.getCanOpenDataType(), writeValue.getLength() / 8, true);
+            WriteBuffer buffer = DataItemIO.staticSerialize(writeValue, field.getCanOpenDataType(), writeValue.getLength(), true);
             if (buffer != null) {
-                context.sendToWire(new SocketCANFrame(field.getNodeId(), buffer.getData()));
+                int cob = field.getService().getMin() + field.getNodeId();
+                context.sendToWire(new SocketCANFrame(cob, buffer.getData()));
                 response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.OK)));
             } else {
                 response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.INVALID_DATA)));
@@ -252,16 +259,27 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
     }
 
     private void readInternally(InternalPlcReadRequest readRequest, CANOpenSDOField field, CompletableFuture<PlcReadResponse> response) {
-        CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(field.getNodeId(), conversation);
-
-        SDOUploadConversation<CANFrame> upload = canopen.sdo().upload(new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType());
         try {
-            upload.execute((value, error) -> {
+            final RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
+            CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(transaction, field.getNodeId(), conversation);
+            System.out.println("----> Submit read " + field.getIndex() + "/" + field.getSubIndex() + " from " + field.getNodeId() + " " + transaction);
+            SDOUploadConversation<CANFrame> upload = canopen.sdo().upload(new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType());
+            CompletableFuture<PlcValue> callback = new CompletableFuture<>();
+            callback.whenComplete((value, error) -> {
+                System.out.println("<---- Received reply " + field.getIndex() + "/" + field.getSubIndex() + " from " + field.getNodeId() + " " + value + "/" + error + " " + transaction);
+                if (error != null) {
+                    response.completeExceptionally(error);
+                    transaction.endRequest();
+                    return;
+                }
+
                 String fieldName = readRequest.getFieldNames().iterator().next();
                 Map<String, ResponseItem<PlcValue>> fields = new HashMap<>();
                 fields.put(fieldName, new ResponseItem<>(PlcResponseCode.OK, value));
                 response.complete(new DefaultPlcReadResponse(readRequest, fields));
+                transaction.endRequest();
             });
+            upload.execute(callback);
         } catch (Exception e) {
             response.completeExceptionally(e);
         }
@@ -275,13 +293,23 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
         CANOpenDriverContext.CALLBACK.receive(msg);
 
         if (service != null) {
-            logger.info("Decoded CANOpen {} from {}, message {}", service, Math.abs(service.getMin() - msg.getIdentifier()), payload);
+            int nodeId = Math.abs(service.getMin() - msg.getIdentifier());
 
             if (service.getPdo() && payload instanceof CANOpenPDOPayload) {
-                logger.info("Broadcasting PDO to subscribers");
-                publishEvent(msg.getIdentifier(), (CANOpenPDOPayload) payload);
+                publishEvent(service, nodeId, (CANOpenPDOPayload) payload);
+            } else {
+                String hex = "";
+                if (logger.isInfoEnabled()) {
+                    try {
+                        final WriteBuffer buffer = new WriteBuffer(payload.getLengthInBytes(), true);
+                        CANOpenPayloadIO.staticSerialize(buffer, payload);
+                        hex = Hex.encodeHexString(buffer.getData());
+                    } catch (ParseException e) {
+                        e.printStackTrace();
+                    }
+                }
+                logger.info("Decoded CANOpen {} from {}, message {}, {}", service, nodeId, payload, hex);
             }
-
         } else {
             logger.info("CAN message {}, {}", msg.getIdentifier(), msg);
         }
@@ -296,7 +324,8 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
 //        }
     }
 
-    private void publishEvent(int nodeId, CANOpenPDOPayload payload) {
+    private void publishEvent(CANOpenService service, int nodeId, CANOpenPDOPayload payload) {
+        CANOpenSubscriptionHandle dispatchedHandle = null;
         for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) {
             DefaultPlcConsumerRegistration registration = entry.getKey();
             Consumer<PlcSubscriptionEvent> consumer = entry.getValue();
@@ -305,7 +334,10 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
                 if (handler instanceof CANOpenSubscriptionHandle) {
                     CANOpenSubscriptionHandle handle = (CANOpenSubscriptionHandle) handler;
 
-                    if (handle.matches(nodeId)) {
+                    if (handle.matches(service, nodeId)) {
+                        logger.trace("Dispatching notification {} for node {} to {}", service, nodeId, handle);
+                        dispatchedHandle = handle;
+
                         CANOpenPDOField field = handle.getField();
                         byte[] data = payload.getPdo().getData();
                         try {
@@ -329,10 +361,15 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
                             );
                             consumer.accept(event);
                         }
+                    } else {
                     }
                 }
             }
         }
+
+        if (dispatchedHandle == null) {
+            logger.trace("Could not find subscription matching {} and node {}", service, nodeId);
+        }
     }
 
     @Override
@@ -362,8 +399,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
 
     private int cobId(int nodeId, CANOpenService service) {
         // form 32 bit socketcan identifier
-        return (nodeId << 24) & 0xff000000 |
-            (service.getValue() << 16 ) & 0x00ff0000;
+        return service.getMin() + nodeId;
     }
 
     private CANOpenService serviceId(int cobId) {
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java
index 07ecb3b..9bfd688 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java
@@ -1,6 +1,7 @@
 package org.apache.plc4x.java.can.protocol;
 
 import org.apache.plc4x.java.can.field.CANOpenPDOField;
+import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
 import org.apache.plc4x.java.spi.messages.PlcSubscriber;
 import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
 
@@ -14,8 +15,11 @@ public class CANOpenSubscriptionHandle extends DefaultPlcSubscriptionHandle {
         this.field = field;
     }
 
-    public boolean matches(int identifier) {
-        return field.getNodeId() == 0 || field.getNodeId() == identifier;
+    public boolean matches(CANOpenService service, int identifier) {
+        if (field.getService() != service) {
+            return false;
+        }
+        return field.getNodeId() == identifier;
     }
 
     public String getName() {
@@ -25,4 +29,13 @@ public class CANOpenSubscriptionHandle extends DefaultPlcSubscriptionHandle {
     public CANOpenPDOField getField() {
         return field;
     }
+
+    public String toString() {
+        return "CANOpenSubscriptionHandle [service=" + field.getService() + ", node=" + intAndHex(field.getNodeId()) + ", cob=" + intAndHex(field.getService().getMin() + field.getNodeId()) + "]";
+    }
+
+    private static String intAndHex(int val) {
+        return val + "(0x" + Integer.toHexString(val) + ")";
+    }
+
 }
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
index 8b01c2a..c35843d 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
@@ -7,7 +7,6 @@ import org.apache.plc4x.java.can.api.conversation.canopen.CANFrameBuilder;
 import org.apache.plc4x.java.socketcan.readwrite.SocketCANFrame;
 import org.apache.plc4x.java.spi.ConversationContext;
 import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext;
-import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
 import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction;
 
 import java.time.Duration;
@@ -15,32 +14,35 @@ import java.util.function.BiConsumer;
 
 public class SocketCANConversation implements CANConversation<CANFrame> {
 
-    private final RequestTransactionManager tm;
+    private final int nodeId;
     private final ConversationContext<SocketCANFrame> context;
 
-    public SocketCANConversation(RequestTransactionManager tm, ConversationContext<SocketCANFrame> context) {
-        this.tm = tm;
+    public SocketCANConversation(int nodeId, ConversationContext<SocketCANFrame> context) {
+        this.nodeId = nodeId;
         this.context = context;
     }
 
     @Override
+    public int getNodeId() {
+        return nodeId;
+    }
+
+    @Override
     public CANFrameBuilder<CANFrame> frameBuilder() {
         return new SocketCANFrameBuilder();
     }
 
     @Override
-    public void send(CANFrame frame, BiConsumer<RequestTransaction, SendRequestContext<CANFrame>> callback) {
+    public void send(RequestTransaction transaction, CANFrame frame, BiConsumer<RequestTransaction, SendRequestContext<CANFrame>> callback) {
         if (frame instanceof SocketCANDelegateFrame) {
-            RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
-
-            ConversationContext.SendRequestContext<CANFrame> ctx = context.sendRequest(((SocketCANDelegateFrame) frame).getFrame())
-                .expectResponse(SocketCANFrame.class, Duration.ofSeconds(10L))
-//                .onError((response, error) -> {
-//                    System.err.println("Unexpected frame " + response + " " + error);
-//                })
-                .unwrap(SocketCANDelegateFrame::new);
-            //return CompletableFuture.completedFuture(new SocketCANTransactionContext<>(transaction, ctx));
-            callback.accept(transaction, ctx);
+            System.out.println("-----> Sending request frame " + transaction);
+            transaction.submit(() -> {
+                ConversationContext.SendRequestContext<CANFrame> ctx = context.sendRequest(((SocketCANDelegateFrame) frame).getFrame())
+                    .expectResponse(SocketCANFrame.class, Duration.ofSeconds(10L))
+                    .unwrap(SocketCANDelegateFrame::new);
+                System.out.println("-----> Frame been sent " + transaction);
+                callback.accept(transaction, ctx);
+            });
             return;
         }
         throw new PlcRuntimeException("Unsupported frame type " + frame);