You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by ld...@apache.org on 2020/11/12 00:42:27 UTC

[plc4x] 24/26: Update (simplify) conversation logic, introduce timeouts and fix string serialization.

This is an automated email from the ASF dual-hosted git repository.

ldywicki pushed a commit to branch feature/socketcan
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 844f772f35b13f18f70915b47c323ab59bfc8f4f
Author: Ɓukasz Dywicki <lu...@code-house.org>
AuthorDate: Fri Nov 6 11:26:23 2020 +0100

    Update (simplify) conversation logic, introduce timeouts and fix string serialization.
---
 .../socketcan/netty/SocketCANChannel.java          | 19 ++++-
 .../src/main/resources/protocols/can/canopen.mspec |  4 +-
 .../api/conversation/canopen/CANConversation.java  |  2 +-
 .../conversation/canopen/CANOpenConversation.java  | 41 ----------
 .../canopen/CANOpenConversationBase.java           | 23 ++++++
 .../api/conversation/canopen/SDOConversation.java  | 44 -----------
 .../canopen/SDODownloadConversation.java           | 92 +++++++++++++---------
 .../canopen/SDOUploadConversation.java             | 75 ++++++++++--------
 .../java/can/configuration/CANConfiguration.java   | 13 +++
 .../java/can/protocol/CANOpenProtocolLogic.java    |  9 +--
 .../java/can/socketcan/SocketCANConversation.java  | 12 +--
 11 files changed, 159 insertions(+), 175 deletions(-)

diff --git a/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java b/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java
index f141e9e..afa4855 100644
--- a/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java
+++ b/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java
@@ -26,6 +26,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelConfig;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
 import io.netty.channel.oio.OioByteStreamChannel;
 import org.apache.plc4x.java.transport.socketcan.netty.address.SocketCANAddress;
 import org.slf4j.Logger;
@@ -115,16 +116,19 @@ public class SocketCANChannel extends OioByteStreamChannel {
                 while (!isInputShutdown()) {
                     ByteBuffer byteBuffer = ByteBuffer.allocateDirect(16);
                     handle.readUnsafe(byteBuffer);
+                    buffer.writeBytes(byteBuffer);
+//                    CanFrame frame = handle.read();
+//                    System.out.println("Read frame " + frame);
 //                    frameBytes.writeBytes(frame.getBuffer());
 //                    String dump = ByteBufUtil.prettyHexDump(frameBytes);
 //                    System.out.println(frame + "\n" + dump);
-                    buffer.writeBytes(byteBuffer);
+//                    buffer.writeBytes(frame.getBuffer());
                 }
             } catch (IOException e) {
                 logger.warn("Could not read data", e);
                 pipeline().fireExceptionCaught(e);
             }
-        });
+        }, "javacan-reader");
         loopThread.start();
 
         activate(new CANInputStream(buffer), new CANOutputStream(handle));
@@ -170,6 +174,11 @@ public class SocketCANChannel extends OioByteStreamChannel {
     }
 
     @Override
+    protected boolean isCompatible(EventLoop loop) {
+        return super.isCompatible(loop);
+    }
+
+    @Override
     public ChannelConfig config() {
         return this.config;
     }
@@ -222,9 +231,15 @@ public class SocketCANChannel extends OioByteStreamChannel {
                 if (buf.readableBytes() > 0) {
                     return buf.readByte() & 0xFF;
                 }
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    throw new IOException(e);
+                }
             }
             throw new SocketTimeoutException();
         }
+
     }
 
 
diff --git a/protocols/can/src/main/resources/protocols/can/canopen.mspec b/protocols/can/src/main/resources/protocols/can/canopen.mspec
index 7886b88..1cb60fd 100644
--- a/protocols/can/src/main/resources/protocols/can/canopen.mspec
+++ b/protocols/can/src/main/resources/protocols/can/canopen.mspec
@@ -335,13 +335,13 @@
         ['CANOpenDataType.OCTET_STRING' String
             [manual string 'UTF-8' 'value'
                 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.parseString", io, size, _type.encoding)'
-                'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length'
+                'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length * 8'
             ]
         ]
         ['CANOpenDataType.VISIBLE_STRING' String
             [manual string 'UTF-8' 'value'
                 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.parseString", io, size, _type.encoding)'
-                'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length'
+                'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length * 8'
             ]
         ]
         //CANOpenDataType.TIME_OF_DAY' CANOpenTime
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
index 471e8a6..67dd41b 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
@@ -12,7 +12,7 @@ public interface CANConversation<W extends CANOpenFrame> {
 
     CANOpenFrameBuilder createBuilder();
 
-    void send(W frame, Consumer<SendRequestContext<W>> callback);
+    SendRequestContext<W> send(W frame);
 
 }
 
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java
deleted file mode 100644
index 9785fee..0000000
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.plc4x.java.can.api.conversation.canopen;
-
-import org.apache.plc4x.java.can.canopen.CANOpenFrame;
-import org.apache.plc4x.java.canopen.readwrite.CANOpenPayload;
-import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
-import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.function.Consumer;
-
-public class CANOpenConversation {
-
-    private final Logger logger = LoggerFactory.getLogger(CANOpenConversation.class);
-    private final int node;
-    private final CANConversation<CANOpenFrame> delegate;
-
-    public CANOpenConversation(int node, CANConversation<CANOpenFrame> delegate) {
-        this.node = node;
-        this.delegate = delegate;
-    }
-
-    public SDOConversation sdo() {
-        return new SDOConversation(this);
-    }
-
-    public void send(CANOpenService service, CANOpenPayload payload, Consumer<SendRequestContext<CANOpenPayload>> callback) {
-        CANOpenFrame frame = delegate.createBuilder().withNodeId(node).withService(service).withPayload(payload).build();
-        delegate.send(frame, (ctx) -> {
-            SendRequestContext<CANOpenPayload> unwrap = ctx
-//                .onError((response, error) -> {
-//                    System.err.println("Unexpected frame " + response + " " + error);
-//                })
-            .unwrap(CANOpenFrame::getPayload);
-            callback.accept(unwrap);
-        });
-
-
-    }
-
-}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java
index 0c65988..28a6941 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java
@@ -2,15 +2,38 @@ package org.apache.plc4x.java.can.api.conversation.canopen;
 
 import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.can.canopen.CANOpenFrame;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenSDORequest;
+import org.apache.plc4x.java.canopen.readwrite.SDORequest;
 import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO;
 import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
+import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
 import org.apache.plc4x.java.spi.generation.ParseException;
 import org.apache.plc4x.java.spi.generation.ReadBuffer;
 
 public abstract class CANOpenConversationBase {
 
+    protected final CANConversation<CANOpenFrame> delegate;
+    protected final int nodeId;
+
+    public CANOpenConversationBase(CANConversation<CANOpenFrame> delegate, int nodeId) {
+        this.delegate = delegate;
+        this.nodeId = nodeId;
+    }
+
     protected PlcValue decodeFrom(byte[] data, CANOpenDataType type, int length) throws ParseException {
         return DataItemIO.staticParse(new ReadBuffer(data, true), type, length);
     }
 
+    protected boolean isTransmitSDOFromReceiver(CANOpenFrame frame) {
+        return frame.getNodeId() == nodeId && frame.getService() == CANOpenService.TRANSMIT_SDO;
+    }
+
+    protected CANOpenFrame createFrame(SDORequest rq) {
+        return delegate.createBuilder()
+            .withNodeId(nodeId)
+            .withService(CANOpenService.RECEIVE_SDO)
+            .withPayload(new CANOpenSDORequest(rq.getCommand(), rq))
+            .build();
+    }
+
 }
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOConversation.java
deleted file mode 100644
index 251e4da..0000000
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOConversation.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.plc4x.java.can.api.conversation.canopen;
-
-import org.apache.plc4x.java.api.value.PlcValue;
-import org.apache.plc4x.java.can.canopen.CANOpenFrame;
-import org.apache.plc4x.java.canopen.readwrite.CANOpenSDORequest;
-import org.apache.plc4x.java.canopen.readwrite.CANOpenSDOResponse;
-import org.apache.plc4x.java.canopen.readwrite.IndexAddress;
-import org.apache.plc4x.java.canopen.readwrite.SDORequest;
-import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
-import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
-import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext;
-import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction;
-
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-
-public class SDOConversation {
-
-    private final CANOpenConversation delegate;
-
-    public SDOConversation(CANOpenConversation delegate) {
-        this.delegate = delegate;
-    }
-
-    public SDODownloadConversation download(IndexAddress indexAddress, PlcValue value, CANOpenDataType type) {
-        return new SDODownloadConversation(this, indexAddress, value, type);
-    }
-
-    public SDOUploadConversation upload(IndexAddress indexAddress, CANOpenDataType type) {
-        return new SDOUploadConversation(this, indexAddress, type);
-    }
-
-    public void send(SDORequest request, Consumer<SendRequestContext<CANOpenSDOResponse>> callback) {
-        delegate.send(CANOpenService.RECEIVE_SDO, new CANOpenSDORequest(request.getCommand(), request), (ctx) -> {
-            SendRequestContext<CANOpenSDOResponse> context = ctx
-//            .onError((response, error) -> {
-//                System.out.println("Unexpected frame " + response + " " + error);
-//            })
-            .only(CANOpenSDOResponse.class);
-            callback.accept(context);
-        });
-    }
-
-}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java
index 8e90bb6..ba0bffb 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java
@@ -4,22 +4,24 @@ import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.java.can.canopen.CANOpenFrame;
 import org.apache.plc4x.java.canopen.readwrite.*;
 import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO;
 import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
+import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
 import org.apache.plc4x.java.canopen.readwrite.types.SDOResponseCommand;
 import org.apache.plc4x.java.spi.generation.ParseException;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
 
 public class SDODownloadConversation extends CANOpenConversationBase {
 
-    private final SDOConversation delegate;
+    private final CANConversation<CANOpenFrame> delegate;
     private final IndexAddress indexAddress;
     private final byte[] data;
 
-    public SDODownloadConversation(SDOConversation delegate, IndexAddress indexAddress, PlcValue value, CANOpenDataType type) {
+    public SDODownloadConversation(CANConversation<CANOpenFrame> delegate, int nodeId, IndexAddress indexAddress, PlcValue value, CANOpenDataType type) {
+        super(delegate, nodeId);
         this.delegate = delegate;
         this.indexAddress = indexAddress;
 
@@ -33,17 +35,20 @@ public class SDODownloadConversation extends CANOpenConversationBase {
     public void execute(CompletableFuture<PlcResponseCode> receiver) {
         if (data.length > 4) {
             // segmented
-
             SDOInitiateSegmentedUploadResponse size = new SDOInitiateSegmentedUploadResponse(data.length);
-            delegate.send(new SDOInitiateDownloadRequest(false, true, indexAddress, size), (ctx) -> {
-                ctx.unwrap(CANOpenSDOResponse::getResponse)
-                    .check(p -> p.getCommand() == SDOResponseCommand.INITIATE_DOWNLOAD)
-                    .only(SDOInitiateDownloadResponse.class)
-                    .check(p -> indexAddress.equals(p.getAddress()))
-                    .handle(x -> {
-                        put(data, receiver, false, 0);
-                    });
-            });
+            delegate.send(createFrame(new SDOInitiateDownloadRequest(false, true, indexAddress, size)))
+                .check(this::isTransmitSDOFromReceiver)
+                .onTimeout(receiver::completeExceptionally)
+                .onError((response, error) -> receiver.completeExceptionally(error))
+                .unwrap(CANOpenFrame::getPayload)
+                .only(CANOpenSDOResponse.class)
+                .unwrap(CANOpenSDOResponse::getResponse)
+                .check(p -> p.getCommand() == SDOResponseCommand.INITIATE_DOWNLOAD)
+                .only(SDOInitiateDownloadResponse.class)
+                .check(p -> indexAddress.equals(p.getAddress()))
+                .handle(x -> {
+                    put(data, receiver, false, 0);
+                });
 
             return;
         }
@@ -55,17 +60,24 @@ public class SDODownloadConversation extends CANOpenConversationBase {
             new SDOInitiateExpeditedUploadResponse(data)
         );
 
-        delegate.send(rq, (ctx) ->
-            ctx.onError((response, error) -> {
-                System.out.println("Unexpected frame " + response + " " + error);
+        delegate.send(createFrame(rq))
+            .check(this::isTransmitSDOFromReceiver)
+            .onTimeout(receiver::completeExceptionally)
+            .onError((response, error) -> {
+                if (error != null) {
+                    receiver.completeExceptionally(error);
+                } else {
+                    receiver.completeExceptionally(new PlcException("Transaction terminated"));
+                }
             })
+            .unwrap(CANOpenFrame::getPayload)
+            .only(CANOpenSDOResponse.class)
             .unwrap(CANOpenSDOResponse::getResponse)
             .only(SDOInitiateDownloadResponse.class)
             .check(r -> r.getCommand() == SDOResponseCommand.INITIATE_DOWNLOAD)
             .handle(r -> {
-                System.out.println(r);
-            })
-        );
+                receiver.complete(PlcResponseCode.OK);
+            });
     }
 
     private void put(byte[] data, CompletableFuture<PlcResponseCode> receiver, boolean toggle, int offset) {
@@ -73,24 +85,28 @@ public class SDODownloadConversation extends CANOpenConversationBase {
         byte[] segment = new byte[Math.min(remaining, 7)];
         System.arraycopy(data, offset, segment, 0, segment.length);
 
-        delegate.send(new SDOSegmentDownloadRequest(toggle, remaining <= 7, segment), (ctx) -> {
-            ctx.unwrap(CANOpenSDOResponse::getResponse)
-                .only(SDOSegmentDownloadResponse.class)
-                .onError((response, error) -> {
-                    if (error != null) {
-                        receiver.completeExceptionally(error);
-                    } else {
-                        receiver.completeExceptionally(new PlcException("Transaction terminated"));
-                    }
-                })
-                .check(response -> response.getToggle() == toggle)
-                .handle(reply -> {
-                    if (offset + segment.length == data.length) {
-                        receiver.complete(PlcResponseCode.OK);
-                    } else {
-                        put(data, receiver, !toggle, offset + segment.length);
-                    }
-                });
-        });
+        delegate.send(createFrame(new SDOSegmentDownloadRequest(toggle, remaining <= 7, segment)))
+            .check(this::isTransmitSDOFromReceiver)
+            .onTimeout(receiver::completeExceptionally)
+            .unwrap(CANOpenFrame::getPayload)
+            .only(CANOpenSDOResponse.class)
+            .unwrap(CANOpenSDOResponse::getResponse)
+            .only(SDOSegmentDownloadResponse.class)
+            .onError((response, error) -> {
+                if (error != null) {
+                    receiver.completeExceptionally(error);
+                } else {
+                    receiver.completeExceptionally(new PlcException("Transaction terminated"));
+                }
+            })
+            .check(response -> response.getToggle() == toggle)
+            .handle(reply -> {
+                if (offset + segment.length == data.length) {
+                    receiver.complete(PlcResponseCode.OK);
+                } else {
+                    put(data, receiver, !toggle, offset + segment.length);
+                }
+            });
     }
+
 }
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
index 0beb17c..6be036a 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
@@ -3,6 +3,7 @@ package org.apache.plc4x.java.can.api.conversation.canopen;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.can.api.segmentation.accumulator.ByteStorage;
+import org.apache.plc4x.java.can.canopen.CANOpenFrame;
 import org.apache.plc4x.java.canopen.readwrite.*;
 import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
 import org.apache.plc4x.java.spi.generation.ParseException;
@@ -15,12 +16,11 @@ import java.util.function.BiConsumer;
 public class SDOUploadConversation extends CANOpenConversationBase {
 
     private final Logger logger = LoggerFactory.getLogger(SDOUploadConversation.class);
-    private final SDOConversation delegate;
     private final IndexAddress address;
     private final CANOpenDataType type;
 
-    public SDOUploadConversation(SDOConversation delegate, IndexAddress address, CANOpenDataType type) {
-        this.delegate = delegate;
+    public SDOUploadConversation(CANConversation<CANOpenFrame> delegate, int nodeId, IndexAddress address, CANOpenDataType type) {
+        super(delegate, nodeId);
         this.address = address;
         this.type = type;
     }
@@ -28,8 +28,12 @@ public class SDOUploadConversation extends CANOpenConversationBase {
     public void execute(CompletableFuture<PlcValue> receiver) {
         SDOInitiateUploadRequest rq = new SDOInitiateUploadRequest(address);
 
-        delegate.send(rq, (ctx) ->
-            ctx.onError((response, error) -> {
+        delegate.send(createFrame(rq))
+            .check(this::isTransmitSDOFromReceiver)
+            .onTimeout(receiver::completeExceptionally)
+            .unwrap(CANOpenFrame::getPayload)
+            .only(CANOpenSDOResponse.class)
+            .onError((response, error) -> {
                 if (error != null) {
                     receiver.completeExceptionally(error);
                     return;
@@ -45,8 +49,7 @@ public class SDOUploadConversation extends CANOpenConversationBase {
             .check(response -> response.getAddress().equals(address))
             .handle(response -> {
                 handle(receiver, response);
-            })
-        );
+            });
     }
 
     private void handle(CompletableFuture<PlcValue> receiver, SDOInitiateUploadResponse answer) {
@@ -76,36 +79,38 @@ public class SDOUploadConversation extends CANOpenConversationBase {
 
     private void fetch(ByteStorage.SDOUploadStorage storage, BiConsumer<Integer, byte[]> valueCallback, CompletableFuture<PlcValue> receiver, boolean toggle, int size) {
         logger.info("Request next data block for address {}/{}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()));
-        delegate.send(new SDOSegmentUploadRequest(toggle), (ctx) -> {
-            ctx.unwrap(CANOpenSDOResponse::getResponse)
-                .onError((response, error) -> {
-                    if (error != null) {
-                        receiver.completeExceptionally(error);
-                        return;
-                    }
+        delegate.send(createFrame(new SDOSegmentUploadRequest(toggle)))
+            .check(this::isTransmitSDOFromReceiver)
+            .onTimeout(receiver::completeExceptionally)
+            .unwrap(CANOpenFrame::getPayload)
+            .only(CANOpenSDOResponse.class)
+            .unwrap(CANOpenSDOResponse::getResponse)
+            .onError((response, error) -> {
+                if (error != null) {
+                    receiver.completeExceptionally(error);
+                    return;
+                }
 
-                    if (response instanceof SDOAbortResponse) {
-                        SDOAbortResponse abort = (SDOAbortResponse) response;
-                        SDOAbort sdoAbort = abort.getAbort();
-                        receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode()));
-                    }
-                })
-                .only(SDOSegmentUploadResponse.class)
-                .check(r -> r.getToggle() == toggle)
-                .handle(response -> {
-                    storage.append(response);
+                if (response instanceof SDOAbortResponse) {
+                    SDOAbortResponse abort = (SDOAbortResponse) response;
+                    SDOAbort sdoAbort = abort.getAbort();
+                    receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode()));
+                }
+            })
+            .only(SDOSegmentUploadResponse.class)
+            .check(r -> r.getToggle() == toggle)
+            .handle(response -> {
+                storage.append(response);
 
-                    if (response.getLast()) {
-                        // validate size
-                        logger.trace("Completed reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
-                        valueCallback.accept(Long.valueOf(size).intValue(), storage.get());
-                    } else {
-                        logger.trace("Continue reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
-                        fetch(storage, valueCallback, receiver, !toggle, size);
-                    }
-                });
-        });
+                if (response.getLast()) {
+                    // validate size
+                    logger.trace("Completed reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
+                    valueCallback.accept(Long.valueOf(size).intValue(), storage.get());
+                } else {
+                    logger.trace("Continue reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
+                    fetch(storage, valueCallback, receiver, !toggle, size);
+                }
+            });
     }
 
-
 }
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
index af4188a..6f18906 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.can.configuration;
 
 import org.apache.plc4x.java.spi.configuration.Configuration;
 import org.apache.plc4x.java.spi.configuration.annotations.ConfigurationParameter;
+import org.apache.plc4x.java.spi.configuration.annotations.defaults.IntDefaultValue;
 import org.apache.plc4x.java.transport.socketcan.CANTransportConfiguration;
 
 public class CANConfiguration implements Configuration, CANTransportConfiguration {
@@ -30,6 +31,10 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio
     @ConfigurationParameter
     private boolean heartbeat;
 
+    @ConfigurationParameter("request-timeout")
+    @IntDefaultValue(1000)
+    private int requestTimeout;
+
     public int getNodeId() {
         return nodeId;
     }
@@ -46,4 +51,12 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio
         this.heartbeat = heartbeat;
     }
 
+    public int getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public void setRequestTimeout(int requestTimeout) {
+        this.requestTimeout = requestTimeout;
+    }
+
 }
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
index a46b17d..f751b7c 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
@@ -31,7 +31,6 @@ import org.apache.plc4x.java.api.value.PlcUSINT;
 import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.can.canopen.CANOpenFrame;
 import org.apache.plc4x.java.can.api.conversation.canopen.CANConversation;
-import org.apache.plc4x.java.can.api.conversation.canopen.CANOpenConversation;
 import org.apache.plc4x.java.can.api.conversation.canopen.SDODownloadConversation;
 import org.apache.plc4x.java.can.api.conversation.canopen.SDOUploadConversation;
 import org.apache.plc4x.java.can.canopen.CANOpenFrameBuilderFactory;
@@ -136,7 +135,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<CANOpenFrame> implem
     @Override
     public void setContext(ConversationContext<CANOpenFrame> context) {
         super.setContext(context);
-        this.conversation = new SocketCANConversation(configuration.getNodeId(), context, factory);
+        this.conversation = new SocketCANConversation(configuration.getNodeId(), context, configuration.getRequestTimeout(), factory);
     }
 
     private CANOpenFrame createFrame(CANOpenHeartbeatPayload state) throws ParseException {
@@ -186,8 +185,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<CANOpenFrame> implem
         });
 
         PlcValue writeValue = writeRequest.getPlcValues().get(0);
-        CANOpenConversation canopen = new CANOpenConversation(field.getNodeId(), conversation);
-        SDODownloadConversation download = canopen.sdo().download(new IndexAddress(field.getIndex(), field.getSubIndex()), writeValue, field.getCanOpenDataType());
+        SDODownloadConversation download = new SDODownloadConversation(conversation, field.getNodeId(), new IndexAddress(field.getIndex(), field.getSubIndex()), writeValue, field.getCanOpenDataType());
         transaction.submit(() -> download.execute(callback));
     }
 
@@ -285,8 +283,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<CANOpenFrame> implem
             transaction.endRequest();
         });
 
-        CANOpenConversation canopen = new CANOpenConversation(field.getNodeId(), conversation);
-        SDOUploadConversation upload = canopen.sdo().upload(new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType());
+        SDOUploadConversation upload = new SDOUploadConversation(conversation, field.getNodeId(), new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType());
         transaction.submit(() -> upload.execute(callback));
     }
 
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
index 721b0f1..305ed68 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
@@ -14,11 +14,13 @@ public class SocketCANConversation implements CANConversation<CANOpenFrame> {
 
     private final int nodeId;
     private final ConversationContext<CANOpenFrame> context;
+    private final int timeout;
     private final CANOpenFrameBuilderFactory factory;
 
-    public SocketCANConversation(int nodeId, ConversationContext<CANOpenFrame> context, CANOpenFrameBuilderFactory factory) {
+    public SocketCANConversation(int nodeId, ConversationContext<CANOpenFrame> context, int timeout, CANOpenFrameBuilderFactory factory) {
         this.nodeId = nodeId;
         this.context = context;
+        this.timeout = timeout;
         this.factory = factory;
     }
 
@@ -32,11 +34,9 @@ public class SocketCANConversation implements CANConversation<CANOpenFrame> {
         return factory.createBuilder();
     }
 
-    @Override
-    public void send(CANOpenFrame frame, Consumer<SendRequestContext<CANOpenFrame>> callback) {
-        SendRequestContext<CANOpenFrame> ctx = context.sendRequest(frame)
-            .expectResponse(CANOpenFrame.class, Duration.ofSeconds(10L));
-        callback.accept(ctx);
+    public SendRequestContext<CANOpenFrame> send(CANOpenFrame frame) {
+        return context.sendRequest(frame)
+            .expectResponse(CANOpenFrame.class, Duration.ofMillis(timeout));
     }
 
 }