You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/02/22 18:07:46 UTC

[incubator-plc4x] 02/02: implement prober payload encoding decoding for ads

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit e1450378230a167e2565b16dc5dca41d046452d9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Feb 22 19:07:39 2018 +0100

    implement prober payload encoding decoding for ads
---
 .../java/ads/api/commands/types/TimeStamp.java     |   6 +-
 .../plc4x/java/ads/netty/Plc4XADSProtocol.java     |  28 ++--
 .../java/ads/netty/util/LittleEndianDecoder.java   | 107 +++++++++++++++
 .../java/ads/netty/util/LittleEndianEncoder.java   | 145 +++++++++++++++++++++
 .../plc4x/java/ads/netty/Plc4XADSProtocolTest.java |  82 ++++++++----
 5 files changed, 319 insertions(+), 49 deletions(-)

diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/api/commands/types/TimeStamp.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/api/commands/types/TimeStamp.java
index d46fecf..c0ce2f0 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/api/commands/types/TimeStamp.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/api/commands/types/TimeStamp.java
@@ -63,7 +63,7 @@ public class TimeStamp extends ByteValue {
     private static byte checkByte(byte[] valueBytes, int length, int i) {
         return length > i ? valueBytes[i] : 0;
     }
-    
+
     private static byte[] ofBigInteger(BigInteger value) {
         byte[] valueBytes = value.toByteArray();
         int length = valueBytes.length;
@@ -129,12 +129,12 @@ public class TimeStamp extends ByteValue {
         return new Date(winTimeToJava(bigIntegerValue).longValue());
     }
 
-    private static BigInteger javaToWinTime(BigInteger timeMillisSince19700101) {
+    public static BigInteger javaToWinTime(BigInteger timeMillisSince19700101) {
         BigInteger timeMillisSince16010101 = EPOCH_DIFF_IN_MILLIS.add(timeMillisSince19700101);
         return timeMillisSince16010101.multiply(BigInteger.valueOf(10_000));
     }
 
-    private static BigInteger winTimeToJava(BigInteger winTime) {
+    public static BigInteger winTimeToJava(BigInteger winTime) {
         BigInteger timeMillisSince16010101 = winTime.divide(BigInteger.valueOf(10_000));
         return timeMillisSince16010101.subtract(EPOCH_DIFF_IN_MILLIS);
     }
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/netty/Plc4XADSProtocol.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/netty/Plc4XADSProtocol.java
index a8de70a..a6f506b 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/netty/Plc4XADSProtocol.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/netty/Plc4XADSProtocol.java
@@ -20,7 +20,6 @@ package org.apache.plc4x.java.ads.netty;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageCodec;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.plc4x.java.ads.api.commands.ADSReadRequest;
 import org.apache.plc4x.java.ads.api.commands.ADSReadResponse;
 import org.apache.plc4x.java.ads.api.commands.ADSWriteRequest;
@@ -47,15 +46,15 @@ import org.apache.plc4x.java.api.types.ResponseCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.plc4x.java.ads.netty.util.LittleEndianDecoder.decodeData;
+import static org.apache.plc4x.java.ads.netty.util.LittleEndianEncoder.encodeData;
+
 public class Plc4XADSProtocol extends MessageToMessageCodec<AMSTCPPacket, PlcRequestContainer<PlcRequest, PlcResponse>> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XADSProtocol.class);
@@ -102,17 +101,7 @@ public class Plc4XADSProtocol extends MessageToMessageCodec<AMSTCPPacket, PlcReq
         Invoke invokeId = Invoke.of(correlationBuilder.incrementAndGet());
         IndexGroup indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
         IndexOffset indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
-        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-        for (Object o : writeRequestItem.getValues()) {
-            // TODO: we need custom serialization here as java types don't really help here
-            byte[] serialize = SerializationUtils.serialize((Serializable) o);
-            try {
-                byteArrayOutputStream.write(serialize);
-            } catch (IOException e) {
-                throw new PlcException(e);
-            }
-        }
-        byte[] bytes = byteArrayOutputStream.toByteArray();
+        byte[] bytes = encodeData(writeRequestItem.getDatatype(), writeRequestItem.getValues().toArray());
         Data data = Data.of(bytes);
         AMSTCPPacket amstcpPacket = ADSWriteRequest.of(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, invokeId, indexGroup, indexOffset, data);
         out.add(amstcpPacket);
@@ -187,19 +176,18 @@ public class Plc4XADSProtocol extends MessageToMessageCodec<AMSTCPPacket, PlcReq
     }
 
     @SuppressWarnings("unchecked")
-    private PlcResponse decodeReadResponse(ADSReadResponse responseMessage, PlcRequestContainer<PlcRequest, PlcResponse> requestContainer) {
+    private PlcResponse decodeReadResponse(ADSReadResponse responseMessage, PlcRequestContainer<PlcRequest, PlcResponse> requestContainer) throws PlcProtocolException {
         PlcReadRequest plcReadRequest = (PlcReadRequest) requestContainer.getRequest();
         ReadRequestItem requestItem = plcReadRequest.getRequestItems().get(0);
 
         ResponseCode responseCode = decodeResponseCode(responseMessage.getResult());
         byte[] bytes = responseMessage.getData().getBytes();
-        // TODO: we need custom serialization here as java types don't really help here
-        Object deserialize = SerializationUtils.deserialize(bytes);
+        List decoded = decodeData(requestItem.getDatatype(), bytes);
 
         if (plcReadRequest instanceof TypeSafePlcReadRequest) {
-            return new TypeSafePlcReadResponse((TypeSafePlcReadRequest) plcReadRequest, Collections.singletonList(new ReadResponseItem<>(requestItem, responseCode, Collections.singletonList(deserialize))));
+            return new TypeSafePlcReadResponse((TypeSafePlcReadRequest) plcReadRequest, Collections.singletonList(new ReadResponseItem<>(requestItem, responseCode, decoded)));
         } else {
-            return new PlcReadResponse(plcReadRequest, Collections.singletonList(new ReadResponseItem<>(requestItem, responseCode, Collections.singletonList(bytes))));
+            return new PlcReadResponse(plcReadRequest, Collections.singletonList(new ReadResponseItem<>(requestItem, responseCode, decoded)));
         }
     }
 
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/netty/util/LittleEndianDecoder.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/netty/util/LittleEndianDecoder.java
new file mode 100644
index 0000000..8211f95
--- /dev/null
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/netty/util/LittleEndianDecoder.java
@@ -0,0 +1,107 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.ads.netty.util;
+
+import org.apache.plc4x.java.ads.api.commands.types.TimeStamp;
+import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
+
+import java.math.BigInteger;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+public class LittleEndianDecoder {
+
+    private LittleEndianDecoder() {
+        // Utility class
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> List<T> decodeData(Class<T> datatype, byte[] adsData) throws PlcProtocolException {
+        List<Object> result = new LinkedList<>();
+        int i = 0;
+        final int length = adsData.length;
+        while (i < length) {
+            byte byteOne = adsData[i];
+            if (datatype == String.class) {
+                StringBuilder builder = new StringBuilder();
+                while (byteOne != (byte) 0x0 && i < length) {
+                    builder.append((char) byteOne);
+                    i++;
+                }
+                i++; // skip terminating character
+                result.add(builder.toString());
+            } else if (datatype == Boolean.class) {
+                result.add((byteOne & 0x01) == 0x01);
+                i += 1;
+            } else if (datatype == Byte.class) {
+                result.add(byteOne);
+                i += 1;
+            } else {
+                byte byteTwo = adsData[i + 1];
+                if (datatype == Short.class) {
+                    result.add((short) (((byteOne & 0xff) & 0xff) | ((byteTwo & 0xff) << 8)));
+                    i += 2;
+                } else {
+                    byte byteThree = adsData[i + 2];
+                    byte byteFour = adsData[i + 3];
+                    if (datatype == Integer.class) {
+                        result.add(((byteOne & 0xff)) | ((byteTwo & 0xff) << 8) | ((byteThree & 0xff) << 16) | ((byteFour & 0xff)) << 24);
+                        i += 4;
+                    } else if (datatype == Float.class) {
+                        // TODO: check how ads expects this data
+                        // Description of the Real number format:
+                        // https://www.sps-lehrgang.de/zahlenformate-step7/#c144
+                        // https://de.wikipedia.org/wiki/IEEE_754
+                        int intValue = ((byteOne & 0xff)) | ((byteTwo & 0xff) << 8) | ((byteThree & 0xff) << 16) | ((byteFour & 0xff) << 24);
+                        result.add(Float.intBitsToFloat(intValue));
+                        i += 4;
+                    } else {
+                        if (datatype == Calendar.class || Calendar.class.isAssignableFrom(datatype)) {
+                            byte byteFive = adsData[i + 4];
+                            byte byteSix = adsData[i + 5];
+                            byte byteSeven = adsData[i + 6];
+                            byte byteEight = adsData[i + 7];
+                            Calendar calendar = Calendar.getInstance();
+                            calendar.setTime(new Date(TimeStamp.winTimeToJava(
+                                new BigInteger(new byte[]{
+                                    // LE
+                                    byteEight,
+                                    byteSeven,
+                                    byteSix,
+                                    byteFive,
+
+                                    byteFour,
+                                    byteThree,
+                                    byteTwo,
+                                    byteOne,
+                                })).longValue()));
+                            result.add(calendar);
+                            i += 8;
+                        } else {
+                            throw new PlcProtocolException("Unsupported datatype " + datatype.getSimpleName());
+                        }
+                    }
+                }
+            }
+        }
+        return (List<T>) result;
+    }
+}
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/netty/util/LittleEndianEncoder.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/netty/util/LittleEndianEncoder.java
new file mode 100644
index 0000000..22d2dd1
--- /dev/null
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/netty/util/LittleEndianEncoder.java
@@ -0,0 +1,145 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.ads.netty.util;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.plc4x.java.ads.api.commands.types.TimeStamp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.stream.Stream;
+
+public class LittleEndianEncoder {
+
+    private LittleEndianEncoder() {
+        // Utility class
+    }
+
+    public static byte[] encodeData(Class<?> valueType, Object[] values) {
+        if (values.length == 0) {
+            return new byte[]{};
+        }
+        Stream<byte[]> result;
+        if (valueType == Boolean.class) {
+            result = encodeBoolean(Arrays.stream(values).map(Boolean.class::cast));
+        } else if (valueType == Byte.class) {
+            result = encodeByte(Arrays.stream(values).map(Byte.class::cast));
+        } else if (valueType == Short.class) {
+            result = encodeShort(Arrays.stream(values).map(Short.class::cast));
+        } else if (valueType == Integer.class) {
+            result = encodeInteger(Arrays.stream(values).map(Integer.class::cast));
+        } else if (valueType == Calendar.class || Calendar.class.isAssignableFrom(valueType)) {
+            result = encodeCalendar(Arrays.stream(values).map(Calendar.class::cast));
+        } else if (valueType == Float.class) {
+            result = encodeFloat(Arrays.stream(values).map(Float.class::cast));
+        } else if (valueType == String.class) {
+            result = encodeString(Arrays.stream(values).map(String.class::cast));
+        } else {
+            throw new IllegalArgumentException("Unsupported type" + valueType);
+        }
+
+        // TODO: maybe we can replace this by a smarter flatmap
+        return result.collect(
+            ByteArrayOutputStream::new,
+            (bos, byteValue) -> {
+                try {
+                    bos.write(byteValue);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            },
+            (a, b) -> {
+            }).toByteArray();
+    }
+
+    public static Stream<byte[]> encodeString(Stream<String> stringStream) {
+        // TODO: what do we do with utf-8 values with 2 bytes? what is the charset here?
+        return stringStream
+            .map(s -> s.getBytes(Charset.defaultCharset()))
+            // TODO: this 0 termination is from s7 but might be completly wrong in ads. Guess its a terminator
+            .map(bytes -> ArrayUtils.add(bytes, (byte) 0x0));
+    }
+
+    public static Stream<byte[]> encodeFloat(Stream<Float> floatStream) {
+        return floatStream
+            // TODO: check how ads expects this data
+            .map(Float::floatToIntBits)
+            .map(intValue -> new byte[]{
+                (byte) ((intValue & 0x000000ff)),
+                (byte) ((intValue & 0x0000ff00) >> 8),
+                (byte) ((intValue & 0x00ff0000) >> 16),
+                (byte) ((intValue & 0xff000000) >> 24),
+            });
+    }
+
+    public static Stream<byte[]> encodeInteger(Stream<Integer> integerStream) {
+        return integerStream
+            .map(intValue -> new byte[]{
+                (byte) ((intValue & 0x000000ff)),
+                (byte) ((intValue & 0x0000ff00) >> 8),
+                (byte) ((intValue & 0x00ff0000) >> 16),
+                (byte) ((intValue & 0xff000000) >> 24),
+            });
+    }
+
+    public static Stream<byte[]> encodeCalendar(Stream<Calendar> calendarStream) {
+        return calendarStream
+            .map(Calendar.class::cast)
+            .map(Calendar::getTime)
+            .map(Date::getTime)
+            .map(BigInteger::valueOf)
+            .map(TimeStamp::javaToWinTime)
+            .map(BigInteger::longValue)
+            .map(time -> new byte[]{
+                (byte) ((time & 0x00000000_000000ffL)),
+                (byte) ((time & 0x00000000_0000ff00L) >> 8),
+                (byte) ((time & 0x00000000_00ff0000L) >> 16),
+                (byte) ((time & 0x00000000_ff000000L) >> 24),
+
+                (byte) ((time & 0x000000ff_00000000L) >> 32),
+                (byte) ((time & 0x0000ff00_00000000L) >> 40),
+                (byte) ((time & 0x00ff0000_00000000L) >> 48),
+                (byte) ((time & 0xff000000_00000000L) >> 56),
+            });
+    }
+
+
+    public static Stream<byte[]> encodeShort(Stream<Short> shortStream) {
+        return shortStream
+            .map(shortValue -> new byte[]{
+                (byte) ((shortValue & 0x00ff)),
+                (byte) ((shortValue & 0xff00) >> 8),
+            });
+    }
+
+    public static Stream<byte[]> encodeByte(Stream<Byte> byteStream) {
+        return byteStream
+            .map(aByte -> new byte[]{aByte});
+    }
+
+    public static Stream<byte[]> encodeBoolean(Stream<Boolean> booleanStream) {
+        return booleanStream
+            .map(booleanValue -> new byte[]{booleanValue ? (byte) 0x01 : (byte) 0x00});
+    }
+}
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/netty/Plc4XADSProtocolTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/netty/Plc4XADSProtocolTest.java
index 72ed38c..16eb05c 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/netty/Plc4XADSProtocolTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/netty/Plc4XADSProtocolTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.plc4x.java.ads.netty;
 
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.plc4x.java.ads.api.commands.ADSReadResponse;
 import org.apache.plc4x.java.ads.api.commands.ADSWriteResponse;
@@ -41,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
@@ -57,47 +57,77 @@ public class Plc4XADSProtocolTest {
 
     private Plc4XADSProtocol SUT;
 
-    @Parameterized.Parameter()
-    public PlcRequestContainer<PlcRequest, PlcResponse> plcRequestContainer;
+    @Parameterized.Parameter
+    public String payloadClazzName;
 
     @Parameterized.Parameter(1)
-    public CompletableFuture completableFuture;
+    public PlcRequestContainer<PlcRequest, PlcResponse> plcRequestContainer;
 
     @Parameterized.Parameter(2)
-    public String plcRequestContainerClassName;
+    public CompletableFuture completableFuture;
 
     @Parameterized.Parameter(3)
-    public AMSTCPPacket amstcpPacket;
+    public String plcRequestContainerClassName;
 
     @Parameterized.Parameter(4)
+    public AMSTCPPacket amstcpPacket;
+
+    @Parameterized.Parameter(5)
     public String aMSTCPPacketClassName;
 
-    @Parameterized.Parameters(name = " {index} {2} {4}")
+    @Parameterized.Parameters(name = "{index} Type:{0} {3} {5}")
     public static Collection<Object[]> data() {
         AMSNetId targetAmsNetId = AMSNetId.of("1.2.3.4.5.6");
         AMSPort targetAmsPort = AMSPort.of(7);
         AMSNetId sourceAmsNetId = AMSNetId.of("8.9.10.11.12.13");
         AMSPort sourceAmsPort = AMSPort.of(14);
         Invoke invokeId = Invoke.of(2);
-        Data data = Data.of(SerializationUtils.serialize("Hello World!"));
-        return Stream.of(
-            ImmutablePair.of(
-                new PlcRequestContainer<>(
-                    PlcWriteRequest
-                        .builder()
-                        .addItem(ADSAddress.of(1, 2), "HelloWorld!")
-                        .build(), new CompletableFuture<>()),
-                ADSWriteResponse.of(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, invokeId, Result.of(0))
-            ),
-            ImmutablePair.of(
-                new PlcRequestContainer<>(
-                    PlcReadRequest
-                        .builder()
-                        .addItem(String.class, ADSAddress.of(1, 2))
-                        .build(), new CompletableFuture<>()),
-                ADSReadResponse.of(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, invokeId, Result.of(0), data)
-            )
-        ).map(pair -> new Object[]{pair.left, pair.left.getResponseFuture(), pair.left.getRequest().getClass().getSimpleName(), pair.right, pair.right.getClass().getSimpleName()}).collect(Collectors.toList());
+        return Stream.of(Boolean.class,
+            Byte.class,
+            Short.class,
+            Calendar.class,
+            Float.class,
+            Integer.class,
+            String.class)
+            .map(clazz -> {
+                if (clazz == Boolean.class) {
+                    return ImmutablePair.of(Boolean.TRUE, new byte[]{0x0});
+                } else if (clazz == Byte.class) {
+                    return ImmutablePair.of(Byte.valueOf("0"), new byte[]{0x0});
+                } else if (clazz == Short.class) {
+                    return ImmutablePair.of(Short.valueOf("0"), new byte[]{0x0, 0x0});
+                } else if (clazz == Calendar.class) {
+                    return ImmutablePair.of(Calendar.getInstance(), new byte[]{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0});
+                } else if (clazz == Float.class) {
+                    return ImmutablePair.of(Float.valueOf("0"), new byte[]{0x0, 0x0, 0x0, 0x0});
+                } else if (clazz == Integer.class) {
+                    return ImmutablePair.of(Integer.valueOf("0"), new byte[]{0x0, 0x0, 0x0, 0x0});
+                } else if (clazz == String.class) {
+                    return ImmutablePair.of(String.valueOf("Hello World!"), new byte[]{0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21, 0x00});
+                } else {
+                    throw new IllegalArgumentException("Unmapped type " + clazz);
+                }
+            })
+            .map(pair -> Stream.of(
+                ImmutablePair.of(
+                    new PlcRequestContainer<>(
+                        PlcWriteRequest
+                            .builder()
+                            .addItem(ADSAddress.of(1, 2), pair.left)
+                            .build(), new CompletableFuture<>()),
+                    ADSWriteResponse.of(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, invokeId, Result.of(0))
+                ),
+                ImmutablePair.of(
+                    new PlcRequestContainer<>(
+                        PlcReadRequest
+                            .builder()
+                            .addItem(pair.left.getClass(), ADSAddress.of(1, 2))
+                            .build(), new CompletableFuture<>()),
+                    ADSReadResponse.of(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, invokeId, Result.of(0), Data.of(pair.right))
+                )
+            ))
+            .flatMap(stream -> stream)
+            .map(pair -> new Object[]{pair.left.getRequest().getRequestItem().orElseThrow(IllegalStateException::new).getDatatype().getSimpleName(), pair.left, pair.left.getResponseFuture(), pair.left.getRequest().getClass().getSimpleName(), pair.right, pair.right.getClass().getSimpleName()}).collect(Collectors.toList());
     }
 
     @Before

-- 
To stop receiving notification emails like this one, please contact
sruehl@apache.org.