You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by ld...@apache.org on 2020/10/21 08:43:17 UTC

[plc4x] 17/21: CANopen microstone - PDO & subscriptions.

This is an automated email from the ASF dual-hosted git repository.

ldywicki pushed a commit to branch feature/socketcan
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 6aa220e3653e070ba8e8b08567c080a92b17e0ef
Author: Ɓukasz Dywicki <lu...@code-house.org>
AuthorDate: Wed Oct 7 11:31:43 2020 +0200

    CANopen microstone - PDO & subscriptions.
---
 .../resources/templates/java/data-io-template.ftlh |  19 ++-
 .../org/apache/plc4x/java/api/value/PlcValues.java |   4 +
 .../src/main/resources/protocols/can/canopen.mspec |  40 +++---
 .../apache/plc4x/java/can/CANOpenPlcDriver.java    |   5 +
 .../apache/plc4x/java/can/field/CANOpenField.java  |   2 +
 .../plc4x/java/can/field/CANOpenFieldHandler.java  |  20 ++-
 .../plc4x/java/can/field/CANOpenPDOField.java      |  64 ++++++++++
 .../java/can/protocol/CANOpenProtocolLogic.java    | 137 +++++++++++++++++++--
 .../can/protocol/CANOpenSubscriptionHandle.java    |  28 +++++
 9 files changed, 281 insertions(+), 38 deletions(-)

diff --git a/build-utils/language-java/src/main/resources/templates/java/data-io-template.ftlh b/build-utils/language-java/src/main/resources/templates/java/data-io-template.ftlh
index 568fed9..3f680da 100644
--- a/build-utils/language-java/src/main/resources/templates/java/data-io-template.ftlh
+++ b/build-utils/language-java/src/main/resources/templates/java/data-io-template.ftlh
@@ -119,10 +119,21 @@ public class ${type.name}IO {
                             <#if helper.isLengthArrayField(field)>
             // Length array
             int _${field.name}Length = ${helper.toParseExpression(field, field.loopExpression, type.parserArguments)};
-            List<${helper.getNonPrimitiveLanguageTypeNameForField(field)}> _${field.name}List = new LinkedList<>();
+            List<PlcValue> _${field.name}List = new LinkedList<>();
             int ${field.name}EndPos = io.getPos() + _${field.name}Length;
             while(io.getPos() < ${field.name}EndPos) {
-                _${field.name}List.add(<#if helper.isSimpleTypeReference(field.type)>${helper.getReadBufferReadMethodCall(field.type)}<#else>${field.type.name}IO.staticParse(io<#if field.params?has_content>, <#list field.params as parserArgument>(${helper.getLanguageTypeNameForTypeReference(helper.getArgumentType(field.type, parserArgument?index), true)}) (${helper.toParseExpression(field, parserArgument, type.parserArguments)})<#sep>, </#sep></#list></#if>)</#if>);
+                _${field.name}List.add(
+                    <#if helper.isSimpleTypeReference(field.type)>PlcValues.of(${helper.getReadBufferReadMethodCall(field.type)})
+                    <#else>${field.type.name}IO.staticParse(io
+                        <#if field.params?has_content>,
+                            <#list field.params as parserArgument>
+                                (${helper.getLanguageTypeNameForTypeReference(helper.getArgumentType(field.type, parserArgument?index), true)}) (${helper.toParseExpression(field, parserArgument, type.parserArguments)})
+                                <#sep>, </#sep>
+                            </#list>
+                        </#if>
+                    )
+                    </#if>
+                );
                 <#-- After parsing, update the current position, but only if it's needed -->
                                 <#if field.loopExpression.contains("curPos")>
                 curPos = io.getPos() - startPos;
@@ -145,7 +156,6 @@ public class ${type.name}IO {
                 Convert the list into an array. However if the array is of a primitive
                 type we have to iterate over it's elements and explicitly cast them.
                 Otherwise a simple toArray call is fine.
-            -->
                             <#if helper.isSimpleTypeReference(field.type)>
             ${helper.getNonPrimitiveLanguageTypeNameForField(field)}[] ${field.name} = new ${helper.getNonPrimitiveLanguageTypeNameForField(field)}[_${field.name}List.size()];
             for(int i = 0; i < _${field.name}List.size(); i++) {
@@ -154,6 +164,9 @@ public class ${type.name}IO {
                             <#else>
             ${helper.getNonPrimitiveLanguageTypeNameForField(field)}[] ${field.name} = _${field.name}List.toArray(new ${helper.getNonPrimitiveLanguageTypeNameForField(field)}[0]);
                             </#if>
+
+            -->
+            List<?> value = _${field.name}List;
                         </#if>
                     <#break>
                     <#case "const">
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/value/PlcValues.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/value/PlcValues.java
index 414eb62..03b3761 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/value/PlcValues.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/value/PlcValues.java
@@ -441,6 +441,10 @@ public class PlcValues {
         if(o == null) {
             return new PlcNull();
         }
+        if (o instanceof Byte) {
+            return new PlcBYTE((Byte) o);
+        }
+
         try {
             String simpleName = o.getClass().getSimpleName();
             Class<?> clazz = o.getClass();
diff --git a/protocols/can/src/main/resources/protocols/can/canopen.mspec b/protocols/can/src/main/resources/protocols/can/canopen.mspec
index 3b1e49d..10afdf9 100644
--- a/protocols/can/src/main/resources/protocols/can/canopen.mspec
+++ b/protocols/can/src/main/resources/protocols/can/canopen.mspec
@@ -272,64 +272,64 @@
 
 [dataIo 'DataItem' [CANOpenDataType 'dataType', int 32 'size']
     [typeSwitch 'dataType'
-        ['CANOpenDataType.BOOLEAN' Boolean
+        ['CANOpenDataType.BOOLEAN' BOOL
             [simple bit 'value']
         ]
-        ['CANOpenDataType.UNSIGNED8' Integer
+        ['CANOpenDataType.UNSIGNED8' USINT
             [simple uint 8 'value']
         ]
-        ['CANOpenDataType.UNSIGNED16' Integer
+        ['CANOpenDataType.UNSIGNED16' UINT
             [simple uint 16 'value']
         ]
-        ['CANOpenDataType.UNSIGNED24' Long
+        ['CANOpenDataType.UNSIGNED24' UDINT
             [simple uint 24 'value']
         ]
-        ['CANOpenDataType.UNSIGNED32' Long
+        ['CANOpenDataType.UNSIGNED32' UDINT
             [simple uint 32 'value']
         ]
-        ['CANOpenDataType.UNSIGNED40' BigInteger
+        ['CANOpenDataType.UNSIGNED40' ULINT
             [simple uint 40 'value']
         ]
-        ['CANOpenDataType.UNSIGNED48' BigInteger
+        ['CANOpenDataType.UNSIGNED48' ULINT
             [simple uint 48 'value']
         ]
-        ['CANOpenDataType.UNSIGNED56' BigInteger
+        ['CANOpenDataType.UNSIGNED56' ULINT
             [simple uint 56 'value']
         ]
-        ['CANOpenDataType.UNSIGNED64' BigInteger
+        ['CANOpenDataType.UNSIGNED64' ULINT
             [simple uint 64 'value']
         ]
-        ['CANOpenDataType.INTEGER8' Integer
+        ['CANOpenDataType.INTEGER8' SINT
             [simple int 8 'value']
         ]
-        ['CANOpenDataType.INTEGER16' Integer
+        ['CANOpenDataType.INTEGER16' INT
             [simple int 16 'value']
         ]
-        ['CANOpenDataType.INTEGER24' Integer
+        ['CANOpenDataType.INTEGER24' DINT
             [simple int 24 'value']
         ]
-        ['CANOpenDataType.INTEGER32' Integer
+        ['CANOpenDataType.INTEGER32' DINT
             [simple int 32 'value']
         ]
-        ['CANOpenDataType.INTEGER40' Long
+        ['CANOpenDataType.INTEGER40' LINT
             [simple int 40 'value']
         ]
-        ['CANOpenDataType.INTEGER48' Long
+        ['CANOpenDataType.INTEGER48' LINT
             [simple int 48 'value']
         ]
-        ['CANOpenDataType.INTEGER56' Long
+        ['CANOpenDataType.INTEGER56' LINT
             [simple int 56 'value']
         ]
-        ['CANOpenDataType.INTEGER64' Long
+        ['CANOpenDataType.INTEGER64' LINT
             [simple int 64 'value']
         ]
-        ['CANOpenDataType.REAL32' Float
+        ['CANOpenDataType.REAL32' REAL
             [simple float 8.23 'value']
         ]
-        ['CANOpenDataType.REAL64' Double
+        ['CANOpenDataType.REAL64' LREAL
             [simple float 11.52 'value']
         ]
-        ['CANOpenDataType.RECORD' List
+        ['CANOpenDataType.RECORD' List [int 32 'size']
             [array int 8 'value' length 'size']
         ]
         ['CANOpenDataType.OCTET_STRING' String
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/CANOpenPlcDriver.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/CANOpenPlcDriver.java
index a8a5f7a..c34806f 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/CANOpenPlcDriver.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/CANOpenPlcDriver.java
@@ -58,6 +58,11 @@ public class CANOpenPlcDriver extends GeneratedDriverBase<SocketCANFrame> {
     }
 
     @Override
+    protected boolean canSubscribe() {
+        return true;
+    }
+
+    @Override
     protected boolean canWrite() {
         return true;
     }
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenField.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenField.java
index 6458848..6917351 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenField.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenField.java
@@ -44,6 +44,8 @@ public abstract class CANOpenField implements PlcField {
     public static CANOpenField of(String addressString) throws PlcInvalidFieldException {
         if (CANOpenSDOField.matches(addressString)) {
             return CANOpenSDOField.of(addressString);
+        } else if (CANOpenPDOField.matches(addressString)) {
+            return CANOpenPDOField.of(addressString);
         }
 
         throw new PlcInvalidFieldException("Unable to parse address: " + addressString);
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenFieldHandler.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenFieldHandler.java
index cf37533..cb61195 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenFieldHandler.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenFieldHandler.java
@@ -21,13 +21,13 @@ package org.apache.plc4x.java.can.field;
 import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.model.PlcField;
-import org.apache.plc4x.java.api.value.PlcList;
-import org.apache.plc4x.java.api.value.PlcString;
-import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.java.api.value.*;
 import org.apache.plc4x.java.spi.connection.DefaultPlcFieldHandler;
 import org.apache.plc4x.java.spi.connection.PlcFieldHandler;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.stream.Collectors;
 
 public class CANOpenFieldHandler extends DefaultPlcFieldHandler implements PlcFieldHandler {
@@ -55,4 +55,18 @@ public class CANOpenFieldHandler extends DefaultPlcFieldHandler implements PlcFi
 
         throw new PlcRuntimeException("Invalid encoder for type " + coField.getCanOpenDataType().name());
     }
+
+    @Override
+    public PlcValue encodeByte(PlcField field, Object[] values) {
+        List<PlcValue> resultSet = new ArrayList<>();
+        for (Object item : values) {
+            resultSet.add(PlcValues.of((Byte) item));
+        }
+
+        if (resultSet.size() == 1) {
+            return resultSet.get(0);
+        } else {
+            return new PlcList(resultSet);
+        }
+    }
 }
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java
new file mode 100644
index 0000000..f65e5b3
--- /dev/null
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java
@@ -0,0 +1,64 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.can.field;
+
+import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
+import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class CANOpenPDOField extends CANOpenField {
+
+    public static final Pattern ADDRESS_PATTERN = Pattern.compile("PDO:" + CANOpenField.NODE_PATTERN + ":(?<canDataType>\\w+)(\\[(?<numberOfElements>\\d)])?");
+    private final CANOpenDataType canOpenDataType;
+
+    public CANOpenPDOField(int node, CANOpenDataType canOpenDataType) {
+        super(node);
+        this.canOpenDataType = canOpenDataType;
+    }
+
+    public CANOpenDataType getCanOpenDataType() {
+        return canOpenDataType;
+    }
+
+    public static boolean matches(String addressString) {
+        return ADDRESS_PATTERN.matcher(addressString).matches();
+    }
+
+    public static Matcher getMatcher(String addressString) throws PlcInvalidFieldException {
+        Matcher matcher = ADDRESS_PATTERN.matcher(addressString);
+        if (matcher.matches()) {
+            return matcher;
+        }
+
+        throw new PlcInvalidFieldException(addressString, ADDRESS_PATTERN);
+    }
+
+    public static CANOpenPDOField of(String addressString) {
+        Matcher matcher = getMatcher(addressString);
+        int nodeId = Integer.parseInt(matcher.group("nodeId"));
+
+        String canDataTypeString = matcher.group("canDataType");
+        CANOpenDataType canOpenDataType = CANOpenDataType.valueOf(canDataTypeString);
+
+        return new CANOpenPDOField(nodeId, canOpenDataType);
+    }
+
+}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
index b5aa34e..d96fe19 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
@@ -19,8 +19,12 @@ under the License.
 package org.apache.plc4x.java.can.protocol;
 
 import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
 import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.types.PlcSubscriptionType;
+import org.apache.plc4x.java.api.value.PlcList;
 import org.apache.plc4x.java.api.value.PlcNull;
 import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.can.api.CANFrame;
@@ -31,6 +35,7 @@ import org.apache.plc4x.java.can.api.conversation.canopen.SDOUploadConversation;
 import org.apache.plc4x.java.can.configuration.CANConfiguration;
 import org.apache.plc4x.java.can.context.CANOpenDriverContext;
 import org.apache.plc4x.java.can.field.CANOpenField;
+import org.apache.plc4x.java.can.field.CANOpenPDOField;
 import org.apache.plc4x.java.can.field.CANOpenSDOField;
 import org.apache.plc4x.java.can.socketcan.SocketCANConversation;
 import org.apache.plc4x.java.canopen.readwrite.*;
@@ -49,18 +54,28 @@ import org.apache.plc4x.java.spi.generation.ReadBuffer;
 import org.apache.plc4x.java.spi.generation.WriteBuffer;
 import org.apache.plc4x.java.spi.messages.*;
 import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
+import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
+import org.apache.plc4x.java.spi.model.InternalPlcSubscriptionHandle;
+import org.apache.plc4x.java.spi.model.SubscriptionPlcField;
 import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
-public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> implements HasConfiguration<CANConfiguration> {
+public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> implements HasConfiguration<CANConfiguration>, PlcSubscriber {
 
     private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(10L);
     private Logger logger = LoggerFactory.getLogger(CANOpenProtocolLogic.class);
@@ -71,6 +86,8 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
     private CANOpenDriverContext canContext;
     private CANConversation<CANFrame> conversation;
 
+    private Map<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> consumers = new ConcurrentHashMap<>();
+
     @Override
     public void setConfiguration(CANConfiguration configuration) {
         this.configuration = configuration;
@@ -130,7 +147,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
     public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         CompletableFuture<PlcWriteResponse> response = new CompletableFuture<>();
         if (writeRequest.getFieldNames().size() != 1) {
-            response.completeExceptionally(new IllegalArgumentException("Unsupported field"));
+            response.completeExceptionally(new IllegalArgumentException("You can write only one field at the time"));
             return response;
         }
 
@@ -140,12 +157,16 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
             return response;
         }
 
-        if (!(field instanceof CANOpenSDOField)) {
-            response.completeExceptionally(new IllegalArgumentException("Only CANOpenSDOField instances are supported"));
+        if (field instanceof CANOpenSDOField) {
+            writeInternally((InternalPlcWriteRequest) writeRequest, (CANOpenSDOField) field, response);
             return response;
-        };
+        }
+        if (field instanceof CANOpenPDOField) {
+            writeInternally((InternalPlcWriteRequest) writeRequest, (CANOpenPDOField) field, response);
+            return response;
+        }
 
-        writeInternally((InternalPlcWriteRequest) writeRequest, (CANOpenSDOField) field, response);
+        response.completeExceptionally(new IllegalArgumentException("Only CANOpenSDOField instances are supported"));
         return response;
     }
 
@@ -158,15 +179,31 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
         try {
             download.execute((value, error) -> {
                 String fieldName = writeRequest.getFieldNames().iterator().next();
-                Map<String, PlcResponseCode> fields = new HashMap<>();
-                fields.put(fieldName, PlcResponseCode.OK);
-                response.complete(new DefaultPlcWriteResponse(writeRequest, fields));
+                response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.OK)));
             });
         } catch (Exception e) {
             response.completeExceptionally(e);
         }
     }
 
+    private void writeInternally(InternalPlcWriteRequest writeRequest, CANOpenPDOField field, CompletableFuture<PlcWriteResponse> response) {
+        PlcValue writeValue = writeRequest.getPlcValues().get(0);
+
+        try {
+            String fieldName = writeRequest.getFieldNames().iterator().next();
+            //
+            WriteBuffer buffer = DataItemIO.staticSerialize(writeValue, field.getCanOpenDataType(), writeValue.getLength() / 8, true);
+            if (buffer != null) {
+                context.sendToWire(new SocketCANFrame(field.getNodeId(), buffer.getData()));
+                response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.OK)));
+            } else {
+                response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.INVALID_DATA)));
+            }
+        } catch (Exception e) {
+            response.completeExceptionally(e);
+        }
+    }
+
     public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         CompletableFuture<PlcReadResponse> response = new CompletableFuture<>();
         if (readRequest.getFieldNames().size() != 1) {
@@ -190,9 +227,28 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
     }
 
     @Override
-    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
-        ((InternalPlcSubscriptionRequest) subscriptionRequest).getSubscriptionFields().get(0).getPlcSubscriptionType();
-        return super.subscribe(subscriptionRequest);
+    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest request) {
+        InternalPlcSubscriptionRequest rq = (InternalPlcSubscriptionRequest) request;
+
+        List<SubscriptionPlcField> fields = rq.getSubscriptionFields();
+
+        Map<String, ResponseItem<PlcSubscriptionHandle>> answers = new LinkedHashMap<>();
+        DefaultPlcSubscriptionResponse response = new DefaultPlcSubscriptionResponse(rq, answers);
+
+        for (Map.Entry<String, SubscriptionPlcField> entry : rq.getSubscriptionPlcFieldMap().entrySet()) {
+            SubscriptionPlcField subscription = entry.getValue();
+            if (subscription.getPlcSubscriptionType() != PlcSubscriptionType.EVENT) {
+                answers.put(entry.getKey(), new ResponseItem<>(PlcResponseCode.UNSUPPORTED, null));
+            } else if (!(subscription.getPlcField() instanceof CANOpenPDOField)) {
+                answers.put(entry.getKey(), new ResponseItem<>(PlcResponseCode.INVALID_ADDRESS, null));
+            } else {
+                answers.put(entry.getKey(), new ResponseItem<>(PlcResponseCode.OK,
+                    new CANOpenSubscriptionHandle(this, entry.getKey(), (CANOpenPDOField) subscription.getPlcField())
+                ));
+            }
+        }
+
+        return CompletableFuture.completedFuture(response);
     }
 
     private void readInternally(InternalPlcReadRequest readRequest, CANOpenSDOField field, CompletableFuture<PlcReadResponse> response) {
@@ -220,6 +276,12 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
 
         if (service != null) {
             logger.info("Decoded CANOpen {} from {}, message {}", service, Math.abs(service.getMin() - msg.getIdentifier()), payload);
+
+            if (service.getPdo() && payload instanceof CANOpenPDOPayload) {
+                logger.info("Broadcasting PDO to subscribers");
+                publishEvent(msg.getIdentifier(), (CANOpenPDOPayload) payload);
+            }
+
         } else {
             logger.info("CAN message {}, {}", msg.getIdentifier(), msg);
         }
@@ -234,6 +296,57 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl
 //        }
     }
 
+    private void publishEvent(int nodeId, CANOpenPDOPayload payload) {
+        for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) {
+            DefaultPlcConsumerRegistration registration = entry.getKey();
+            Consumer<PlcSubscriptionEvent> consumer = entry.getValue();
+
+            for (InternalPlcSubscriptionHandle handler : registration.getAssociatedHandles()) {
+                if (handler instanceof CANOpenSubscriptionHandle) {
+                    CANOpenSubscriptionHandle handle = (CANOpenSubscriptionHandle) handler;
+
+                    if (handle.matches(nodeId)) {
+                        CANOpenPDOField field = handle.getField();
+                        byte[] data = payload.getPdo().getData();
+                        try {
+                            PlcValue value = DataItemIO.staticParse(new ReadBuffer(data, true), field.getCanOpenDataType(), data.length);
+                            DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(
+                                Instant.now(),
+                                Collections.singletonMap(
+                                    handle.getName(),
+                                    new ResponseItem<>(PlcResponseCode.OK, value)
+                                )
+                            );
+                            consumer.accept(event);
+                        } catch (ParseException e) {
+                            logger.warn("Could not parse data to desired type: {}", field.getCanOpenDataType(), e);
+                            DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(
+                                Instant.now(),
+                                Collections.singletonMap(
+                                    handle.getName(),
+                                    new ResponseItem<>(PlcResponseCode.INVALID_DATA, new PlcNull())
+                                )
+                            );
+                            consumer.accept(event);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
+        final DefaultPlcConsumerRegistration consumerRegistration =new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new InternalPlcSubscriptionHandle[0]));
+        consumers.put(consumerRegistration, consumer);
+        return consumerRegistration;
+    }
+
+    @Override
+    public void unregister(PlcConsumerRegistration registration) {
+        consumers.remove(registration);
+    }
+
     @Override
     public void close(ConversationContext<SocketCANFrame> context) {
 
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java
new file mode 100644
index 0000000..07ecb3b
--- /dev/null
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java
@@ -0,0 +1,28 @@
+package org.apache.plc4x.java.can.protocol;
+
+import org.apache.plc4x.java.can.field.CANOpenPDOField;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
+import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
+
+public class CANOpenSubscriptionHandle extends DefaultPlcSubscriptionHandle {
+    private final String name;
+    private final CANOpenPDOField field;
+
+    public CANOpenSubscriptionHandle(PlcSubscriber subscriber, String name, CANOpenPDOField field) {
+        super(subscriber);
+        this.name = name;
+        this.field = field;
+    }
+
+    public boolean matches(int identifier) {
+        return field.getNodeId() == 0 || field.getNodeId() == identifier;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public CANOpenPDOField getField() {
+        return field;
+    }
+}