You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by ld...@apache.org on 2020/11/12 00:42:27 UTC
[plc4x] 24/26: Update (simplify) conversation logic,
introduce timeouts and fix string serialization.
This is an automated email from the ASF dual-hosted git repository.
ldywicki pushed a commit to branch feature/socketcan
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 844f772f35b13f18f70915b47c323ab59bfc8f4f
Author: Ćukasz Dywicki <lu...@code-house.org>
AuthorDate: Fri Nov 6 11:26:23 2020 +0100
Update (simplify) conversation logic, introduce timeouts and fix string serialization.
---
.../socketcan/netty/SocketCANChannel.java | 19 ++++-
.../src/main/resources/protocols/can/canopen.mspec | 4 +-
.../api/conversation/canopen/CANConversation.java | 2 +-
.../conversation/canopen/CANOpenConversation.java | 41 ----------
.../canopen/CANOpenConversationBase.java | 23 ++++++
.../api/conversation/canopen/SDOConversation.java | 44 -----------
.../canopen/SDODownloadConversation.java | 92 +++++++++++++---------
.../canopen/SDOUploadConversation.java | 75 ++++++++++--------
.../java/can/configuration/CANConfiguration.java | 13 +++
.../java/can/protocol/CANOpenProtocolLogic.java | 9 +--
.../java/can/socketcan/SocketCANConversation.java | 12 +--
11 files changed, 159 insertions(+), 175 deletions(-)
diff --git a/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java b/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java
index f141e9e..afa4855 100644
--- a/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java
+++ b/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java
@@ -26,6 +26,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
import io.netty.channel.oio.OioByteStreamChannel;
import org.apache.plc4x.java.transport.socketcan.netty.address.SocketCANAddress;
import org.slf4j.Logger;
@@ -115,16 +116,19 @@ public class SocketCANChannel extends OioByteStreamChannel {
while (!isInputShutdown()) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(16);
handle.readUnsafe(byteBuffer);
+ buffer.writeBytes(byteBuffer);
+// CanFrame frame = handle.read();
+// System.out.println("Read frame " + frame);
// frameBytes.writeBytes(frame.getBuffer());
// String dump = ByteBufUtil.prettyHexDump(frameBytes);
// System.out.println(frame + "\n" + dump);
- buffer.writeBytes(byteBuffer);
+// buffer.writeBytes(frame.getBuffer());
}
} catch (IOException e) {
logger.warn("Could not read data", e);
pipeline().fireExceptionCaught(e);
}
- });
+ }, "javacan-reader");
loopThread.start();
activate(new CANInputStream(buffer), new CANOutputStream(handle));
@@ -170,6 +174,11 @@ public class SocketCANChannel extends OioByteStreamChannel {
}
@Override
+ protected boolean isCompatible(EventLoop loop) {
+ return super.isCompatible(loop);
+ }
+
+ @Override
public ChannelConfig config() {
return this.config;
}
@@ -222,9 +231,15 @@ public class SocketCANChannel extends OioByteStreamChannel {
if (buf.readableBytes() > 0) {
return buf.readByte() & 0xFF;
}
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
}
throw new SocketTimeoutException();
}
+
}
diff --git a/protocols/can/src/main/resources/protocols/can/canopen.mspec b/protocols/can/src/main/resources/protocols/can/canopen.mspec
index 7886b88..1cb60fd 100644
--- a/protocols/can/src/main/resources/protocols/can/canopen.mspec
+++ b/protocols/can/src/main/resources/protocols/can/canopen.mspec
@@ -335,13 +335,13 @@
['CANOpenDataType.OCTET_STRING' String
[manual string 'UTF-8' 'value'
'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.parseString", io, size, _type.encoding)'
- 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length'
+ 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length * 8'
]
]
['CANOpenDataType.VISIBLE_STRING' String
[manual string 'UTF-8' 'value'
'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.parseString", io, size, _type.encoding)'
- 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length'
+ 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length * 8'
]
]
//CANOpenDataType.TIME_OF_DAY' CANOpenTime
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
index 471e8a6..67dd41b 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
@@ -12,7 +12,7 @@ public interface CANConversation<W extends CANOpenFrame> {
CANOpenFrameBuilder createBuilder();
- void send(W frame, Consumer<SendRequestContext<W>> callback);
+ SendRequestContext<W> send(W frame);
}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java
deleted file mode 100644
index 9785fee..0000000
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.plc4x.java.can.api.conversation.canopen;
-
-import org.apache.plc4x.java.can.canopen.CANOpenFrame;
-import org.apache.plc4x.java.canopen.readwrite.CANOpenPayload;
-import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
-import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.function.Consumer;
-
-public class CANOpenConversation {
-
- private final Logger logger = LoggerFactory.getLogger(CANOpenConversation.class);
- private final int node;
- private final CANConversation<CANOpenFrame> delegate;
-
- public CANOpenConversation(int node, CANConversation<CANOpenFrame> delegate) {
- this.node = node;
- this.delegate = delegate;
- }
-
- public SDOConversation sdo() {
- return new SDOConversation(this);
- }
-
- public void send(CANOpenService service, CANOpenPayload payload, Consumer<SendRequestContext<CANOpenPayload>> callback) {
- CANOpenFrame frame = delegate.createBuilder().withNodeId(node).withService(service).withPayload(payload).build();
- delegate.send(frame, (ctx) -> {
- SendRequestContext<CANOpenPayload> unwrap = ctx
-// .onError((response, error) -> {
-// System.err.println("Unexpected frame " + response + " " + error);
-// })
- .unwrap(CANOpenFrame::getPayload);
- callback.accept(unwrap);
- });
-
-
- }
-
-}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java
index 0c65988..28a6941 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java
@@ -2,15 +2,38 @@ package org.apache.plc4x.java.can.api.conversation.canopen;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.can.canopen.CANOpenFrame;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenSDORequest;
+import org.apache.plc4x.java.canopen.readwrite.SDORequest;
import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO;
import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
+import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
import org.apache.plc4x.java.spi.generation.ParseException;
import org.apache.plc4x.java.spi.generation.ReadBuffer;
public abstract class CANOpenConversationBase {
+ protected final CANConversation<CANOpenFrame> delegate;
+ protected final int nodeId;
+
+ public CANOpenConversationBase(CANConversation<CANOpenFrame> delegate, int nodeId) {
+ this.delegate = delegate;
+ this.nodeId = nodeId;
+ }
+
protected PlcValue decodeFrom(byte[] data, CANOpenDataType type, int length) throws ParseException {
return DataItemIO.staticParse(new ReadBuffer(data, true), type, length);
}
+ protected boolean isTransmitSDOFromReceiver(CANOpenFrame frame) {
+ return frame.getNodeId() == nodeId && frame.getService() == CANOpenService.TRANSMIT_SDO;
+ }
+
+ protected CANOpenFrame createFrame(SDORequest rq) {
+ return delegate.createBuilder()
+ .withNodeId(nodeId)
+ .withService(CANOpenService.RECEIVE_SDO)
+ .withPayload(new CANOpenSDORequest(rq.getCommand(), rq))
+ .build();
+ }
+
}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOConversation.java
deleted file mode 100644
index 251e4da..0000000
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOConversation.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.plc4x.java.can.api.conversation.canopen;
-
-import org.apache.plc4x.java.api.value.PlcValue;
-import org.apache.plc4x.java.can.canopen.CANOpenFrame;
-import org.apache.plc4x.java.canopen.readwrite.CANOpenSDORequest;
-import org.apache.plc4x.java.canopen.readwrite.CANOpenSDOResponse;
-import org.apache.plc4x.java.canopen.readwrite.IndexAddress;
-import org.apache.plc4x.java.canopen.readwrite.SDORequest;
-import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
-import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
-import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext;
-import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction;
-
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-
-public class SDOConversation {
-
- private final CANOpenConversation delegate;
-
- public SDOConversation(CANOpenConversation delegate) {
- this.delegate = delegate;
- }
-
- public SDODownloadConversation download(IndexAddress indexAddress, PlcValue value, CANOpenDataType type) {
- return new SDODownloadConversation(this, indexAddress, value, type);
- }
-
- public SDOUploadConversation upload(IndexAddress indexAddress, CANOpenDataType type) {
- return new SDOUploadConversation(this, indexAddress, type);
- }
-
- public void send(SDORequest request, Consumer<SendRequestContext<CANOpenSDOResponse>> callback) {
- delegate.send(CANOpenService.RECEIVE_SDO, new CANOpenSDORequest(request.getCommand(), request), (ctx) -> {
- SendRequestContext<CANOpenSDOResponse> context = ctx
-// .onError((response, error) -> {
-// System.out.println("Unexpected frame " + response + " " + error);
-// })
- .only(CANOpenSDOResponse.class);
- callback.accept(context);
- });
- }
-
-}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java
index 8e90bb6..ba0bffb 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java
@@ -4,22 +4,24 @@ import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.java.can.canopen.CANOpenFrame;
import org.apache.plc4x.java.canopen.readwrite.*;
import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO;
import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
+import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
import org.apache.plc4x.java.canopen.readwrite.types.SDOResponseCommand;
import org.apache.plc4x.java.spi.generation.ParseException;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
public class SDODownloadConversation extends CANOpenConversationBase {
- private final SDOConversation delegate;
+ private final CANConversation<CANOpenFrame> delegate;
private final IndexAddress indexAddress;
private final byte[] data;
- public SDODownloadConversation(SDOConversation delegate, IndexAddress indexAddress, PlcValue value, CANOpenDataType type) {
+ public SDODownloadConversation(CANConversation<CANOpenFrame> delegate, int nodeId, IndexAddress indexAddress, PlcValue value, CANOpenDataType type) {
+ super(delegate, nodeId);
this.delegate = delegate;
this.indexAddress = indexAddress;
@@ -33,17 +35,20 @@ public class SDODownloadConversation extends CANOpenConversationBase {
public void execute(CompletableFuture<PlcResponseCode> receiver) {
if (data.length > 4) {
// segmented
-
SDOInitiateSegmentedUploadResponse size = new SDOInitiateSegmentedUploadResponse(data.length);
- delegate.send(new SDOInitiateDownloadRequest(false, true, indexAddress, size), (ctx) -> {
- ctx.unwrap(CANOpenSDOResponse::getResponse)
- .check(p -> p.getCommand() == SDOResponseCommand.INITIATE_DOWNLOAD)
- .only(SDOInitiateDownloadResponse.class)
- .check(p -> indexAddress.equals(p.getAddress()))
- .handle(x -> {
- put(data, receiver, false, 0);
- });
- });
+ delegate.send(createFrame(new SDOInitiateDownloadRequest(false, true, indexAddress, size)))
+ .check(this::isTransmitSDOFromReceiver)
+ .onTimeout(receiver::completeExceptionally)
+ .onError((response, error) -> receiver.completeExceptionally(error))
+ .unwrap(CANOpenFrame::getPayload)
+ .only(CANOpenSDOResponse.class)
+ .unwrap(CANOpenSDOResponse::getResponse)
+ .check(p -> p.getCommand() == SDOResponseCommand.INITIATE_DOWNLOAD)
+ .only(SDOInitiateDownloadResponse.class)
+ .check(p -> indexAddress.equals(p.getAddress()))
+ .handle(x -> {
+ put(data, receiver, false, 0);
+ });
return;
}
@@ -55,17 +60,24 @@ public class SDODownloadConversation extends CANOpenConversationBase {
new SDOInitiateExpeditedUploadResponse(data)
);
- delegate.send(rq, (ctx) ->
- ctx.onError((response, error) -> {
- System.out.println("Unexpected frame " + response + " " + error);
+ delegate.send(createFrame(rq))
+ .check(this::isTransmitSDOFromReceiver)
+ .onTimeout(receiver::completeExceptionally)
+ .onError((response, error) -> {
+ if (error != null) {
+ receiver.completeExceptionally(error);
+ } else {
+ receiver.completeExceptionally(new PlcException("Transaction terminated"));
+ }
})
+ .unwrap(CANOpenFrame::getPayload)
+ .only(CANOpenSDOResponse.class)
.unwrap(CANOpenSDOResponse::getResponse)
.only(SDOInitiateDownloadResponse.class)
.check(r -> r.getCommand() == SDOResponseCommand.INITIATE_DOWNLOAD)
.handle(r -> {
- System.out.println(r);
- })
- );
+ receiver.complete(PlcResponseCode.OK);
+ });
}
private void put(byte[] data, CompletableFuture<PlcResponseCode> receiver, boolean toggle, int offset) {
@@ -73,24 +85,28 @@ public class SDODownloadConversation extends CANOpenConversationBase {
byte[] segment = new byte[Math.min(remaining, 7)];
System.arraycopy(data, offset, segment, 0, segment.length);
- delegate.send(new SDOSegmentDownloadRequest(toggle, remaining <= 7, segment), (ctx) -> {
- ctx.unwrap(CANOpenSDOResponse::getResponse)
- .only(SDOSegmentDownloadResponse.class)
- .onError((response, error) -> {
- if (error != null) {
- receiver.completeExceptionally(error);
- } else {
- receiver.completeExceptionally(new PlcException("Transaction terminated"));
- }
- })
- .check(response -> response.getToggle() == toggle)
- .handle(reply -> {
- if (offset + segment.length == data.length) {
- receiver.complete(PlcResponseCode.OK);
- } else {
- put(data, receiver, !toggle, offset + segment.length);
- }
- });
- });
+ delegate.send(createFrame(new SDOSegmentDownloadRequest(toggle, remaining <= 7, segment)))
+ .check(this::isTransmitSDOFromReceiver)
+ .onTimeout(receiver::completeExceptionally)
+ .unwrap(CANOpenFrame::getPayload)
+ .only(CANOpenSDOResponse.class)
+ .unwrap(CANOpenSDOResponse::getResponse)
+ .only(SDOSegmentDownloadResponse.class)
+ .onError((response, error) -> {
+ if (error != null) {
+ receiver.completeExceptionally(error);
+ } else {
+ receiver.completeExceptionally(new PlcException("Transaction terminated"));
+ }
+ })
+ .check(response -> response.getToggle() == toggle)
+ .handle(reply -> {
+ if (offset + segment.length == data.length) {
+ receiver.complete(PlcResponseCode.OK);
+ } else {
+ put(data, receiver, !toggle, offset + segment.length);
+ }
+ });
}
+
}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
index 0beb17c..6be036a 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
@@ -3,6 +3,7 @@ package org.apache.plc4x.java.can.api.conversation.canopen;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.can.api.segmentation.accumulator.ByteStorage;
+import org.apache.plc4x.java.can.canopen.CANOpenFrame;
import org.apache.plc4x.java.canopen.readwrite.*;
import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
import org.apache.plc4x.java.spi.generation.ParseException;
@@ -15,12 +16,11 @@ import java.util.function.BiConsumer;
public class SDOUploadConversation extends CANOpenConversationBase {
private final Logger logger = LoggerFactory.getLogger(SDOUploadConversation.class);
- private final SDOConversation delegate;
private final IndexAddress address;
private final CANOpenDataType type;
- public SDOUploadConversation(SDOConversation delegate, IndexAddress address, CANOpenDataType type) {
- this.delegate = delegate;
+ public SDOUploadConversation(CANConversation<CANOpenFrame> delegate, int nodeId, IndexAddress address, CANOpenDataType type) {
+ super(delegate, nodeId);
this.address = address;
this.type = type;
}
@@ -28,8 +28,12 @@ public class SDOUploadConversation extends CANOpenConversationBase {
public void execute(CompletableFuture<PlcValue> receiver) {
SDOInitiateUploadRequest rq = new SDOInitiateUploadRequest(address);
- delegate.send(rq, (ctx) ->
- ctx.onError((response, error) -> {
+ delegate.send(createFrame(rq))
+ .check(this::isTransmitSDOFromReceiver)
+ .onTimeout(receiver::completeExceptionally)
+ .unwrap(CANOpenFrame::getPayload)
+ .only(CANOpenSDOResponse.class)
+ .onError((response, error) -> {
if (error != null) {
receiver.completeExceptionally(error);
return;
@@ -45,8 +49,7 @@ public class SDOUploadConversation extends CANOpenConversationBase {
.check(response -> response.getAddress().equals(address))
.handle(response -> {
handle(receiver, response);
- })
- );
+ });
}
private void handle(CompletableFuture<PlcValue> receiver, SDOInitiateUploadResponse answer) {
@@ -76,36 +79,38 @@ public class SDOUploadConversation extends CANOpenConversationBase {
private void fetch(ByteStorage.SDOUploadStorage storage, BiConsumer<Integer, byte[]> valueCallback, CompletableFuture<PlcValue> receiver, boolean toggle, int size) {
logger.info("Request next data block for address {}/{}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()));
- delegate.send(new SDOSegmentUploadRequest(toggle), (ctx) -> {
- ctx.unwrap(CANOpenSDOResponse::getResponse)
- .onError((response, error) -> {
- if (error != null) {
- receiver.completeExceptionally(error);
- return;
- }
+ delegate.send(createFrame(new SDOSegmentUploadRequest(toggle)))
+ .check(this::isTransmitSDOFromReceiver)
+ .onTimeout(receiver::completeExceptionally)
+ .unwrap(CANOpenFrame::getPayload)
+ .only(CANOpenSDOResponse.class)
+ .unwrap(CANOpenSDOResponse::getResponse)
+ .onError((response, error) -> {
+ if (error != null) {
+ receiver.completeExceptionally(error);
+ return;
+ }
- if (response instanceof SDOAbortResponse) {
- SDOAbortResponse abort = (SDOAbortResponse) response;
- SDOAbort sdoAbort = abort.getAbort();
- receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode()));
- }
- })
- .only(SDOSegmentUploadResponse.class)
- .check(r -> r.getToggle() == toggle)
- .handle(response -> {
- storage.append(response);
+ if (response instanceof SDOAbortResponse) {
+ SDOAbortResponse abort = (SDOAbortResponse) response;
+ SDOAbort sdoAbort = abort.getAbort();
+ receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode()));
+ }
+ })
+ .only(SDOSegmentUploadResponse.class)
+ .check(r -> r.getToggle() == toggle)
+ .handle(response -> {
+ storage.append(response);
- if (response.getLast()) {
- // validate size
- logger.trace("Completed reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
- valueCallback.accept(Long.valueOf(size).intValue(), storage.get());
- } else {
- logger.trace("Continue reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
- fetch(storage, valueCallback, receiver, !toggle, size);
- }
- });
- });
+ if (response.getLast()) {
+ // validate size
+ logger.trace("Completed reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
+ valueCallback.accept(Long.valueOf(size).intValue(), storage.get());
+ } else {
+ logger.trace("Continue reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size);
+ fetch(storage, valueCallback, receiver, !toggle, size);
+ }
+ });
}
-
}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
index af4188a..6f18906 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.can.configuration;
import org.apache.plc4x.java.spi.configuration.Configuration;
import org.apache.plc4x.java.spi.configuration.annotations.ConfigurationParameter;
+import org.apache.plc4x.java.spi.configuration.annotations.defaults.IntDefaultValue;
import org.apache.plc4x.java.transport.socketcan.CANTransportConfiguration;
public class CANConfiguration implements Configuration, CANTransportConfiguration {
@@ -30,6 +31,10 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio
@ConfigurationParameter
private boolean heartbeat;
+ @ConfigurationParameter("request-timeout")
+ @IntDefaultValue(1000)
+ private int requestTimeout;
+
public int getNodeId() {
return nodeId;
}
@@ -46,4 +51,12 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio
this.heartbeat = heartbeat;
}
+ public int getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public void setRequestTimeout(int requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
index a46b17d..f751b7c 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
@@ -31,7 +31,6 @@ import org.apache.plc4x.java.api.value.PlcUSINT;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.can.canopen.CANOpenFrame;
import org.apache.plc4x.java.can.api.conversation.canopen.CANConversation;
-import org.apache.plc4x.java.can.api.conversation.canopen.CANOpenConversation;
import org.apache.plc4x.java.can.api.conversation.canopen.SDODownloadConversation;
import org.apache.plc4x.java.can.api.conversation.canopen.SDOUploadConversation;
import org.apache.plc4x.java.can.canopen.CANOpenFrameBuilderFactory;
@@ -136,7 +135,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<CANOpenFrame> implem
@Override
public void setContext(ConversationContext<CANOpenFrame> context) {
super.setContext(context);
- this.conversation = new SocketCANConversation(configuration.getNodeId(), context, factory);
+ this.conversation = new SocketCANConversation(configuration.getNodeId(), context, configuration.getRequestTimeout(), factory);
}
private CANOpenFrame createFrame(CANOpenHeartbeatPayload state) throws ParseException {
@@ -186,8 +185,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<CANOpenFrame> implem
});
PlcValue writeValue = writeRequest.getPlcValues().get(0);
- CANOpenConversation canopen = new CANOpenConversation(field.getNodeId(), conversation);
- SDODownloadConversation download = canopen.sdo().download(new IndexAddress(field.getIndex(), field.getSubIndex()), writeValue, field.getCanOpenDataType());
+ SDODownloadConversation download = new SDODownloadConversation(conversation, field.getNodeId(), new IndexAddress(field.getIndex(), field.getSubIndex()), writeValue, field.getCanOpenDataType());
transaction.submit(() -> download.execute(callback));
}
@@ -285,8 +283,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<CANOpenFrame> implem
transaction.endRequest();
});
- CANOpenConversation canopen = new CANOpenConversation(field.getNodeId(), conversation);
- SDOUploadConversation upload = canopen.sdo().upload(new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType());
+ SDOUploadConversation upload = new SDOUploadConversation(conversation, field.getNodeId(), new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType());
transaction.submit(() -> upload.execute(callback));
}
diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
index 721b0f1..305ed68 100644
--- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
+++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
@@ -14,11 +14,13 @@ public class SocketCANConversation implements CANConversation<CANOpenFrame> {
private final int nodeId;
private final ConversationContext<CANOpenFrame> context;
+ private final int timeout;
private final CANOpenFrameBuilderFactory factory;
- public SocketCANConversation(int nodeId, ConversationContext<CANOpenFrame> context, CANOpenFrameBuilderFactory factory) {
+ public SocketCANConversation(int nodeId, ConversationContext<CANOpenFrame> context, int timeout, CANOpenFrameBuilderFactory factory) {
this.nodeId = nodeId;
this.context = context;
+ this.timeout = timeout;
this.factory = factory;
}
@@ -32,11 +34,9 @@ public class SocketCANConversation implements CANConversation<CANOpenFrame> {
return factory.createBuilder();
}
- @Override
- public void send(CANOpenFrame frame, Consumer<SendRequestContext<CANOpenFrame>> callback) {
- SendRequestContext<CANOpenFrame> ctx = context.sendRequest(frame)
- .expectResponse(CANOpenFrame.class, Duration.ofSeconds(10L));
- callback.accept(ctx);
+ public SendRequestContext<CANOpenFrame> send(CANOpenFrame frame) {
+ return context.sendRequest(frame)
+ .expectResponse(CANOpenFrame.class, Duration.ofMillis(timeout));
}
}