You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@plc4x.apache.org by GitBox <gi...@apache.org> on 2018/09/11 13:26:20 UTC

[GitHub] asfgit closed pull request #18: Add support for multiple tasks in kafka sink connector

asfgit closed pull request #18: Add support for multiple tasks in kafka sink connector
URL: https://github.com/apache/incubator-plc4x/pull/18
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java b/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java
index 41090f8f5..35e78f4eb 100644
--- a/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java
+++ b/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java
@@ -148,7 +148,7 @@ private void runFactory() {
 
             // Define the event stream.
             // 1) PLC4X source generating a stream of bytes.
-            Supplier<Byte> plcSupplier = PlcFunctions.byteSupplier(plcAdapter, "OUTPUTS/0");
+            Supplier<Byte> plcSupplier = PlcFunctions.byteSupplier(plcAdapter, "%Q0:BYTE");
             // 2) Use polling to get an item from the byte-stream in regular intervals.
             TStream<Byte> plcOutputStates = top.poll(plcSupplier, 100, TimeUnit.MILLISECONDS);
 
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 45ae926eb..fa2e32d62 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -18,26 +18,20 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class Plc4xSinkConnector extends SinkConnector {
     static final String URL_CONFIG = "url";
     private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
-    static final String QUERY_CONFIG = "query";
-    private static final String QUERY_DOC = "Field query to be sent to the PLC";
-
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
-        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
-        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
 
     private String url;
     private String query;
@@ -49,18 +43,19 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        Map<String, String> taskConfig = new HashMap<>();
-        taskConfig.put(URL_CONFIG, url);
-        taskConfig.put(QUERY_CONFIG, query);
-
-        // Only one task will be created; ignoring maxTasks for now
-        return Collections.singletonList(taskConfig);
+        List<Map<String, String>> configs = new LinkedList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            Map<String, String> taskConfig = new HashMap<>();
+            taskConfig.put(URL_CONFIG, url);
+            configs.add(taskConfig);
+        }
+        return configs;
     }
 
     @Override
     public void start(Map<String, String> props) {
-        url = props.get(URL_CONFIG);
-        query = props.get(QUERY_CONFIG);
+        AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+        url = config.getString(URL_CONFIG);
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index b29418f18..648a32e4a 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -33,10 +34,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.util.concurrent.ExecutionException;
 
 public class Plc4xSinkTask extends SinkTask {
-    private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
-
     private String url;
-    private String query;
 
     private PlcConnection plcConnection;
     private PlcWriter plcWriter;
@@ -48,8 +46,8 @@ public String version() {
 
     @Override
     public void start(Map<String, String> props) {
-        url = props.get(Plc4xSinkConnector.URL_CONFIG);
-        query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
+        AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+        url = config.getString(Plc4xSinkConnector.URL_CONFIG);
 
         openConnection();
 
@@ -65,12 +63,29 @@ public void stop() {
     @Override
     public void put(Collection<SinkRecord> records) {
         for (SinkRecord record: records) {
-            String value = record.value().toString(); // TODO: implement other data types
-            PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
+            String query = record.key().toString();
+            Object value = record.value();
+            PlcWriteRequest.Builder builder = plcWriter.writeRequestBuilder();
+            PlcWriteRequest plcRequest = addToBuilder(builder, query, value).build();
             doWrite(plcRequest);
         }
     }
 
+    // TODO: fix this
+    private PlcWriteRequest.Builder addToBuilder(PlcWriteRequest.Builder builder, String query, Object obj) {
+        Class<?> type = obj.getClass();
+
+        if (type.equals(Integer.class)) {
+            int value = (int) obj;
+            builder.addItem(query, query, value);
+        } else if (type.equals(String.class)) {
+            String value = (String) obj;
+            builder.addItem(query, query, value);
+        }
+
+        return builder;
+    }
+
     private void openConnection() {
         try {
             plcConnection = new PlcDriverManager().getConnection(url);
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4d1d9d026..4d014a535 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -18,13 +18,15 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -35,22 +37,22 @@ Licensed to the Apache Software Foundation (ASF) under one
     static final String URL_CONFIG = "url";
     private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
-    static final String QUERY_CONFIG = "query";
-    private static final String QUERY_DOC = "Field query to be sent to the PLC";
+    static final String QUERIES_CONFIG = "queries";
+    private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
 
     static final String RATE_CONFIG = "rate";
     private static final Integer RATE_DEFAULT = 1000;
     private static final String RATE_DOC = "Polling rate";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
         .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
-        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC)
+        .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
         .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
 
     private String topic;
     private String url;
-    private String query;
+    private List<String> queries;
     private Integer rate;
 
     @Override
@@ -60,22 +62,26 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        Map<String, String> taskConfig = new HashMap<>();
-        taskConfig.put(TOPIC_CONFIG, topic);
-        taskConfig.put(URL_CONFIG, url);
-        taskConfig.put(QUERY_CONFIG, query);
-        taskConfig.put(RATE_CONFIG, rate.toString());
-
-        // Only one task will be created; ignoring maxTasks for now
-        return Collections.singletonList(taskConfig);
+        List<Map<String, String>> configs = new LinkedList<>();
+        List<List<String>> queryGroups = ConnectorUtils.groupPartitions(queries, maxTasks);
+        for (List<String> queryGroup: queryGroups) {
+            Map<String, String> taskConfig = new HashMap<>();
+            taskConfig.put(TOPIC_CONFIG, topic);
+            taskConfig.put(URL_CONFIG, url);
+            taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup));
+            taskConfig.put(RATE_CONFIG, rate.toString());
+            configs.add(taskConfig);
+        }
+        return configs;
     }
 
     @Override
     public void start(Map<String, String> props) {
-        topic = props.get(TOPIC_CONFIG);
-        url = props.get(URL_CONFIG);
-        query = props.get(QUERY_CONFIG);
-        rate = Integer.valueOf(props.get(RATE_CONFIG));
+        AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+        topic = config.getString(TOPIC_CONFIG);
+        url = config.getString(URL_CONFIG);
+        queries = config.getList(QUERIES_CONFIG);
+        rate = config.getInt(RATE_CONFIG);
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 798ae3113..c354a1ea2 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -32,6 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.kafka.util.VersionUtil;
 
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
@@ -45,11 +47,10 @@ Licensed to the Apache Software Foundation (ASF) under one
 public class Plc4xSourceTask extends SourceTask {
     private final static long WAIT_LIMIT_MILLIS = 100;
     private final static long TIMEOUT_LIMIT_MILLIS = 5000;
-    private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
 
     private String topic;
     private String url;
-    private String query;
+    private List<String> queries;
 
     private PlcConnection plcConnection;
     private PlcReader plcReader;
@@ -67,16 +68,22 @@ public String version() {
 
     @Override
     public void start(Map<String, String> props) {
-        topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
-        url = props.get(Plc4xSourceConnector.URL_CONFIG);
-        query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
+        AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+        topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG);
+        url = config.getString(Plc4xSourceConnector.URL_CONFIG);
+        queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG);
 
         openConnection();
 
         plcReader = plcConnection.getReader()
             .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
 
-        plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, query).build();
+
+        PlcReadRequest.Builder builder = plcReader.readRequestBuilder();
+        for (String query : queries) {
+            builder.addItem(query, query);
+        }
+        plcRequest = builder.build();
 
         int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
         scheduler = Executors.newScheduledThreadPool(1);
@@ -152,30 +159,35 @@ private synchronized boolean awaitFetch(long milliseconds) throws InterruptedExc
     }
 
     private List<SourceRecord> extractValues(PlcReadResponse<?> response) {
-        final PlcResponseCode rc = response.getResponseCode(FIELD_KEY);
-
-        if (!rc.equals(PlcResponseCode.OK))
-            return null; // TODO: should we really ignore this?
-
-        Object rawValue = response.getObject(FIELD_KEY);
-        Schema valueSchema = getSchema(rawValue.getClass());
-        Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
-        Long timestamp = System.currentTimeMillis();
-        Map<String, String> sourcePartition = Collections.singletonMap("url", url);
-        Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
-        SourceRecord record =
-            new SourceRecord(
-                sourcePartition,
-                sourceOffset,
-                topic,
-                Schema.STRING_SCHEMA,
-                query,
-                valueSchema,
-                value
-            );
-
-        return Collections.singletonList(record); // TODO: what if there are multiple values?
+        final List<SourceRecord> result = new LinkedList<>();
+        for (String query : queries) {
+            final PlcResponseCode rc = response.getResponseCode(query);
+            if (!rc.equals(PlcResponseCode.OK))  {
+                continue;
+            }
+
+            Object rawValue = response.getObject(query);
+            Schema valueSchema = getSchema(rawValue.getClass());
+            Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+            Long timestamp = System.currentTimeMillis();
+            Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+            Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+            SourceRecord record =
+                new SourceRecord(
+                    sourcePartition,
+                    sourceOffset,
+                    topic,
+                    Schema.STRING_SCHEMA,
+                    query,
+                    valueSchema,
+                    value
+                );
+
+            result.add(record);
+        }
+
+        return result;
     }
 
     private Schema getSchema(Class<?> type) {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/items/DefaultBooleanFieldItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/items/DefaultBooleanFieldItem.java
index 9f8db5d33..239c47802 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/items/DefaultBooleanFieldItem.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/items/DefaultBooleanFieldItem.java
@@ -18,6 +18,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.java.base.messages.items;
 
+import java.util.BitSet;
+
 public class DefaultBooleanFieldItem extends FieldItem<Boolean> {
 
     public DefaultBooleanFieldItem(Boolean... values) {
@@ -42,5 +44,26 @@ public Boolean getBoolean(int index) {
         return null;
     }
 
+    @Override
+    public boolean isValidByte(int index) {
+        int byteAddress = index >> 3;
+        return (getValue(byteAddress) != null);
+    }
+
+    @Override
+    public Byte getByte(int index) {
+        BitSet bitSet = new BitSet();
+        int i = 0;
+        for (Boolean value : getValues()) {
+            bitSet.set(i, value);
+            i++;
+        }
+        byte[] bytes = bitSet.toByteArray();
+        if(bytes.length < index) {
+            return null;
+        }
+        return bytes[index];
+    }
+
 }
 
diff --git a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java
index a862ea06d..e5664074f 100644
--- a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java
+++ b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java
@@ -27,23 +27,24 @@ Licensed to the Apache Software Foundation (ASF) under one
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageCodec;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.exceptions.PlcNotImplementedException;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcUnsupportedDataTypeException;
 import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.java.base.messages.InternalPlcRequest;
-import org.apache.plc4x.java.base.messages.InternalPlcResponse;
-import org.apache.plc4x.java.base.messages.PlcRequestContainer;
+import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.messages.items.DefaultBooleanFieldItem;
+import org.apache.plc4x.java.base.messages.items.DefaultIntegerFieldItem;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
 import org.apache.plc4x.java.modbus.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -71,7 +72,7 @@ protected void encode(ChannelHandlerContext ctx, PlcRequestContainer<InternalPlc
     }
 
     private void encodeWriteRequest(PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> msg, List<Object> out) throws PlcException {
-        PlcWriteRequest request = (PlcWriteRequest) msg.getRequest();
+        InternalPlcWriteRequest request = (InternalPlcWriteRequest) msg.getRequest();
 
         // TODO: support multiple requests
         if(request.getFieldNames().size() != 1) {
@@ -80,7 +81,7 @@ private void encodeWriteRequest(PlcRequestContainer<InternalPlcRequest, Internal
         // TODO: check if we can map like this. Implication is that we can only work with int, short, byte and boolean
         // TODO: for higher data types float, double etc we might need to split the bytes into chunks
         String fieldName = request.getFieldNames().iterator().next();
-        int quantity = request.getNumValues(fieldName);
+        int quantity = request.getNumberOfValues(fieldName);
         short unitId = 0;
 
         /*
@@ -107,7 +108,7 @@ private void encodeWriteRequest(PlcRequestContainer<InternalPlcRequest, Internal
         if (field instanceof RegisterModbusField) {
             RegisterModbusField registerModbusField = (RegisterModbusField) field;
             if (quantity > 1) {
-                byte[] bytesToWrite = flattenByteValues(request.getValues(fieldName));
+                byte[] bytesToWrite = flattenByteValues(request.getFieldItem(fieldName).getValues());
                 // A register is a 16 bit (2 byte) value ... so every value needs 2 byte.
                 int requiredLength = 2 * quantity;
                 if (bytesToWrite.length != requiredLength) {
@@ -115,9 +116,10 @@ private void encodeWriteRequest(PlcRequestContainer<InternalPlcRequest, Internal
                 }
                 modbusRequest = new WriteMultipleRegistersRequest(registerModbusField.getAddress(), quantity, bytesToWrite);
             } else {
-                byte[] register = request.getValues(fieldName)[0];
-                if (register.length != 2) {
-                    throw new PlcProtocolException("Invalid register values created. Should be 2 bytes. Was " + register.length);
+                byte[] register = flattenByteValue(request.getFieldItem(fieldName).getValues()[0]);
+                if ((register == null) || (register.length != 2)) {
+                    throw new PlcProtocolException("Invalid register values created. Should be 2 bytes. Was " +
+                        ((register != null) ? register.length : 0));
                 }
                 // Reconvert the two bytes back to an int.
                 int intToWrite = register[0] << 8 | register[1] & 0xff;
@@ -126,16 +128,18 @@ private void encodeWriteRequest(PlcRequestContainer<InternalPlcRequest, Internal
         } else if (field instanceof CoilModbusField) {
             CoilModbusField coilModbusField = (CoilModbusField) field;
             if (quantity > 1) {
-                byte[] bytesToWrite = flattenBitValues(request.getValues(fieldName));
+                byte[] bytesToWrite = flattenBitValues(request.getFieldItem(fieldName).getValues());
                 // As each coil value represents a bit, the number of bytes needed
                 // equals "ceil(quantity/8)" (a 3 bit shift is a division by 8 ... the +1 is the "ceil")
                 int requiredLength = (quantity >> 3) + 1;
                 if (bytesToWrite.length != requiredLength) {
-                    throw new PlcProtocolException("Invalid coil values created. Should be big enough to transport N bits. Was " + bytesToWrite.length + ", expected " + requiredLength);
+                    throw new PlcProtocolException(
+                        "Invalid coil values created. Should be big enough to transport N bits. Was " +
+                            bytesToWrite.length + ", expected " + requiredLength);
                 }
                 modbusRequest = new WriteMultipleCoilsRequest(coilModbusField.getAddress(), quantity, bytesToWrite);
             } else {
-                boolean booleanToWrite = produceCoilValue(request.getValues(fieldName));
+                boolean booleanToWrite = produceCoilValue(request.getFieldItem(fieldName).getValues());
                 modbusRequest = new WriteSingleCoilRequest(coilModbusField.getAddress(), booleanToWrite);
             }
         } else if (field instanceof MaskWriteRegisterModbusField) {
@@ -202,17 +206,19 @@ protected void decode(ChannelHandlerContext ctx, ModbusTcpPayload msg, List<Obje
         LOGGER.debug("{}: transactionId: {}, unitId: {}, modbusPdu:{}", msg, msg.getTransactionId(), msg.getUnitId(), msg.getModbusPdu());
         // TODO: implement me
         short transactionId = msg.getTransactionId();
-        PlcRequestContainer<PlcRequest, PlcResponse> plcRequestContainer = requestsMap.get(transactionId);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> plcRequestContainer = requestsMap.get(transactionId);
         if (plcRequestContainer == null) {
             throw new PlcProtocolException("Unrelated payload received. [transactionId: " + msg.getTransactionId() + ", unitId: " + msg.getUnitId() + ", modbusPdu: " + msg.getModbusPdu() + "]");
         }
 
         // TODO: only single Item supported for now
-        PlcRequest request = plcRequestContainer.getRequest();
+        InternalPlcFieldRequest request = (InternalPlcFieldRequest) plcRequestContainer.getRequest();
         // TODO: support multiple requests (Shouldn't be needed as the request wouldn't have been sent)
         if(request.getFieldNames().size() != 1) {
             throw new PlcNotImplementedException("Only single message supported for now");
         }
+        String fieldName = request.getFieldNames().iterator().next();
+        ModbusField field = (ModbusField) request.getField(fieldName);
 
         ModbusPdu modbusPdu = msg.getModbusPdu();
         short unitId = msg.getUnitId();
@@ -221,57 +227,75 @@ protected void decode(ChannelHandlerContext ctx, ModbusTcpPayload msg, List<Obje
             // TODO: finish implementation
             WriteMultipleCoilsResponse writeMultipleCoilsResponse = (WriteMultipleCoilsResponse) modbusPdu;
             LOGGER.debug("{}: address:{}, quantity:{}", writeMultipleCoilsResponse, writeMultipleCoilsResponse.getAddress(), writeMultipleCoilsResponse.getQuantity());
-            plcRequestContainer.getResponseFuture().complete(new PlcWriteResponse((PlcWriteRequest) request, new PlcWriteResponseItem<>((PlcWriteRequestItem) requestItem, PlcResponseCode.OK)));
+            Map<String, PlcResponseCode> responseValues = new HashMap<>();
+            responseValues.put(fieldName, PlcResponseCode.OK);
+            plcRequestContainer.getResponseFuture().complete(new DefaultPlcWriteResponse((InternalPlcWriteRequest) request, responseValues));
         } else if (modbusPdu instanceof WriteMultipleRegistersResponse) {
             // TODO: finish implementation
             WriteMultipleRegistersResponse writeMultipleRegistersResponse = (WriteMultipleRegistersResponse) modbusPdu;
             LOGGER.debug("{}: address:{}, quantity:{}", writeMultipleRegistersResponse, writeMultipleRegistersResponse.getAddress(), writeMultipleRegistersResponse.getQuantity());
-            plcRequestContainer.getResponseFuture().complete(new PlcWriteResponse((PlcWriteRequest) request, new PlcWriteResponseItem<>((PlcWriteRequestItem) requestItem, PlcResponseCode.OK)));
+            Map<String, PlcResponseCode> responseValues = new HashMap<>();
+            responseValues.put(fieldName, PlcResponseCode.OK);
+            plcRequestContainer.getResponseFuture().complete(new DefaultPlcWriteResponse((InternalPlcWriteRequest) request, responseValues));
         } else if (modbusPdu instanceof WriteSingleCoilResponse) {
             // TODO: finish implementation
             WriteSingleCoilResponse writeSingleCoilResponse = (WriteSingleCoilResponse) modbusPdu;
             LOGGER.debug("{}: address:{}, value:{}", writeSingleCoilResponse, writeSingleCoilResponse.getAddress(), writeSingleCoilResponse.getValue());
-            plcRequestContainer.getResponseFuture().complete(new PlcWriteResponse((PlcWriteRequest) request, new PlcWriteResponseItem<>((PlcWriteRequestItem) requestItem, PlcResponseCode.OK)));
+            Map<String, PlcResponseCode> responseValues = new HashMap<>();
+            responseValues.put(fieldName, PlcResponseCode.OK);
+            plcRequestContainer.getResponseFuture().complete(new DefaultPlcWriteResponse((InternalPlcWriteRequest) request, responseValues));
         } else if (modbusPdu instanceof WriteSingleRegisterResponse) {
             // TODO: finish implementation
             WriteSingleRegisterResponse writeSingleRegisterResponse = (WriteSingleRegisterResponse) modbusPdu;
             LOGGER.debug("{}: address:{}, value:{}", writeSingleRegisterResponse, writeSingleRegisterResponse.getAddress(), writeSingleRegisterResponse.getValue());
-            plcRequestContainer.getResponseFuture().complete(new PlcWriteResponse((PlcWriteRequest) request, new PlcWriteResponseItem<>((PlcWriteRequestItem) requestItem, PlcResponseCode.OK)));
+            Map<String, PlcResponseCode> responseValues = new HashMap<>();
+            responseValues.put(fieldName, PlcResponseCode.OK);
+            plcRequestContainer.getResponseFuture().complete(new DefaultPlcWriteResponse((InternalPlcWriteRequest) request, responseValues));
         } else if (modbusPdu instanceof ReadCoilsResponse) {
             // TODO: finish implementation
             ReadCoilsResponse readCoilsResponse = (ReadCoilsResponse) modbusPdu;
             LOGGER.debug("{}: Nothing", readCoilsResponse);
             ByteBuf byteBuf = readCoilsResponse.getCoilStatus();
-            List<?> data = produceCoilValueList(requestItem, dataType, byteBuf);
-            plcRequestContainer.getResponseFuture().complete(new PlcReadResponse((PlcReadRequest) request, new PlcReadResponseItem((PlcReadRequestItem) requestItem, PlcResponseCode.OK, data)));
+            List<?> data = produceCoilValueList(field, byteBuf);
+            Map<String, Pair<PlcResponseCode, FieldItem>> responseValues = new HashMap<>();
+            responseValues.put(fieldName, new ImmutablePair<>(PlcResponseCode.OK, new DefaultBooleanFieldItem((Boolean[]) data.toArray(new Boolean[0]))));
+            plcRequestContainer.getResponseFuture().complete(new DefaultPlcReadResponse((InternalPlcReadRequest) request, responseValues));
         } else if (modbusPdu instanceof ReadDiscreteInputsResponse) {
             // TODO: finish implementation
             ReadDiscreteInputsResponse readDiscreteInputsResponse = (ReadDiscreteInputsResponse) modbusPdu;
             LOGGER.debug("{}: Nothing", readDiscreteInputsResponse);
             ByteBuf byteBuf = readDiscreteInputsResponse.getInputStatus();
-            List<?> data = produceCoilValueList(requestItem, dataType, byteBuf);
-            plcRequestContainer.getResponseFuture().complete(new PlcReadResponse((PlcReadRequest) request, new PlcReadResponseItem((PlcReadRequestItem) requestItem, PlcResponseCode.OK, data)));
+            List<?> data = produceCoilValueList(field, byteBuf);
+            Map<String, Pair<PlcResponseCode, FieldItem>> responseValues = new HashMap<>();
+            responseValues.put(fieldName, new ImmutablePair<>(PlcResponseCode.OK, new DefaultBooleanFieldItem((Boolean[]) data.toArray(new Boolean[0]))));
+            plcRequestContainer.getResponseFuture().complete(new DefaultPlcReadResponse((InternalPlcReadRequest) request, responseValues));
         } else if (modbusPdu instanceof ReadHoldingRegistersResponse) {
             // TODO: finish implementation
             ReadHoldingRegistersResponse readHoldingRegistersResponse = (ReadHoldingRegistersResponse) modbusPdu;
             LOGGER.debug("{}: Nothing", readHoldingRegistersResponse);
             ByteBuf byteBuf = readHoldingRegistersResponse.getRegisters();
             // TODO: use register method
-            List<?> data = produceRegisterValueList(requestItem, dataType, byteBuf);
-            plcRequestContainer.getResponseFuture().complete(new PlcReadResponse((PlcReadRequest) request, new PlcReadResponseItem((PlcReadRequestItem) requestItem, PlcResponseCode.OK, data)));
+            List<?> data = produceRegisterValueList(field, byteBuf);
+            Map<String, Pair<PlcResponseCode, FieldItem>> responseValues = new HashMap<>();
+            responseValues.put(fieldName, new ImmutablePair<>(PlcResponseCode.OK, new DefaultBooleanFieldItem((Boolean[]) data.toArray(new Boolean[0]))));
+            plcRequestContainer.getResponseFuture().complete(new DefaultPlcReadResponse((InternalPlcReadRequest) request, responseValues));
         } else if (modbusPdu instanceof ReadInputRegistersResponse) {
             // TODO: finish implementation
             ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) modbusPdu;
             LOGGER.debug("{}: Nothing", readInputRegistersResponse);
             ByteBuf byteBuf = readInputRegistersResponse.getRegisters();
             // TODO: use register method
-            List<?> data = produceRegisterValueList(requestItem, dataType, byteBuf);
-            plcRequestContainer.getResponseFuture().complete(new PlcReadResponse((PlcReadRequest) request, new PlcReadResponseItem((PlcReadRequestItem) requestItem, PlcResponseCode.OK, data)));
+            List<?> data = produceRegisterValueList(field, byteBuf);
+            Map<String, Pair<PlcResponseCode, FieldItem>> responseValues = new HashMap<>();
+            responseValues.put(fieldName, new ImmutablePair<>(PlcResponseCode.OK, new DefaultIntegerFieldItem((Short[]) data.toArray(new Short[0]))));
+            plcRequestContainer.getResponseFuture().complete(new DefaultPlcReadResponse((InternalPlcReadRequest) request, responseValues));
         } else if (modbusPdu instanceof MaskWriteRegisterResponse) {
             // TODO: finish implementation
             MaskWriteRegisterResponse maskWriteRegisterResponse = (MaskWriteRegisterResponse) modbusPdu;
             LOGGER.debug("{}: Nothing", maskWriteRegisterResponse);
-            plcRequestContainer.getResponseFuture().complete(new PlcWriteResponse((PlcWriteRequest) request, new PlcWriteResponseItem<>((PlcWriteRequestItem) requestItem, PlcResponseCode.OK)));
+            Map<String, PlcResponseCode> responseValues = new HashMap<>();
+            responseValues.put(fieldName, PlcResponseCode.OK);
+            plcRequestContainer.getResponseFuture().complete(new DefaultPlcWriteResponse((InternalPlcWriteRequest) request, responseValues));
         } else if (modbusPdu instanceof ExceptionResponse) {
             ExceptionResponse exceptionResponse = (ExceptionResponse) modbusPdu;
             throw new PlcProtocolException("Error received " + exceptionResponse.getExceptionCode());
@@ -290,15 +314,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
     // Encoding helpers.
     ////////////////////////////////////////////////////////////////////////////////
 
-    private boolean produceCoilValue(List<?> values) throws PlcProtocolException {
-        if (values.size() != 1) {
+    private boolean produceCoilValue(Object[] values) throws PlcProtocolException {
+        if (values.length != 1) {
             throw new PlcProtocolException("Only one value allowed");
         }
         byte multiCoil = produceCoilValues(values)[0];
         return multiCoil != 0;
     }
 
-    private byte[] produceCoilValues(List<?> values) throws PlcProtocolException {
+    private byte[] produceCoilValues(Object[] values) throws PlcProtocolException {
         List<Byte> coils = new LinkedList<>();
         byte actualCoil = 0;
         int i = 7;
@@ -366,7 +390,12 @@ private boolean produceCoilValue(List<?> values) throws PlcProtocolException {
         return ArrayUtils.toPrimitive(coils.toArray(new Byte[0]));
     }
 
-    private byte[] flattenByteValues(byte[][] values) {
+    private byte[] flattenByteValue(Object value) {
+        // TODO: Implement ...
+        return null;
+    }
+
+    private byte[] flattenByteValues(Object[] values) {
         byte[] rawValues = new byte[values.length * values[0].length];
         for(int i = 0; i < values.length; i ++) {
             byte[] value = values[i];
@@ -375,7 +404,7 @@ private boolean produceCoilValue(List<?> values) throws PlcProtocolException {
         return rawValues;
     }
 
-    private byte[] flattenBitValues(byte[][] values) {
+    private byte[] flattenBitValues(Object[] values) {
         byte[] rawValues = new byte[values.length * values[0].length];
         for(int i = 0; i < values.length; i ++) {
             byte[] value = values[i];
@@ -470,7 +499,7 @@ private boolean produceCoilValue(List<?> values) throws PlcProtocolException {
     ////////////////////////////////////////////////////////////////////////////////
     // Decoding helpers.
     ////////////////////////////////////////////////////////////////////////////////
-    private <T> List<T> produceCoilValueList(RequestItem requestItem, Class<T> dataType, ByteBuf byteBuf) {
+    private <T> List<T> produceCoilValueList(ModbusField field, ByteBuf byteBuf) {
         PlcReadRequestItem readRequestItem = (PlcReadRequestItem) requestItem;
         byte[] bytes = new byte[byteBuf.readableBytes()];
         if (bytes.length < 1) {
@@ -530,7 +559,7 @@ private boolean produceCoilValue(List<?> values) throws PlcProtocolException {
         return data;
     }
 
-    private <T> List<T> produceRegisterValueList(RequestItem requestItem, Class<T> dataType, ByteBuf byteBuf) throws PlcProtocolException {
+    private <T> List<T> produceRegisterValueList(ModbusField field, ByteBuf byteBuf) throws PlcProtocolException {
         PlcReadRequestItem readRequestItem = (PlcReadRequestItem) requestItem;
         int readableBytes = byteBuf.readableBytes();
         if (readableBytes % 2 != 0) {
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
index 65cce8aed..3966deb8a 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -51,15 +51,16 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     void set(TestField field, FieldItem value) {
         Objects.requireNonNull(field);
-        switch(field.getType()) {
+        switch (field.getType()) {
             case STATE:
                 state.put(field, value);
                 return;
             case STDOUT:
-                System.out.printf("TEST PLC: %s%n", value.getObject(0).toString());
+                System.out.printf("TEST PLC STDOUT [%s]: %s%n", field.getName(), Objects.toString(value.getValues()[0]));
                 return;
             case RANDOM:
-                // Just ignore this ...
+                System.out.printf("TEST PLC RANDOM [%s]: %s%n", field.getName(), Objects.toString(value.getValues()[0]));
+                return;
         }
         throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
     }
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java
index 7c9f28676..5953da6f3 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java
@@ -28,6 +28,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.Arrays;
 
 public class TestFieldHandler implements PlcFieldHandler {
 
@@ -52,7 +53,7 @@ public FieldItem encodeBoolean(PlcField field, Object[] values) {
     public FieldItem encodeByte(PlcField field, Object[] values) {
         TestField testField = (TestField) field;
         if(testField.getDataType() == Byte.class) {
-            return new DefaultIntegerFieldItem((Long[]) values);
+            return new DefaultIntegerFieldItem(Arrays.stream(values).map(x -> new Long((Byte) x)).toArray(Long[]::new));
         }
         throw new PlcRuntimeException("Invalid encoder for type " + testField.getDataType().getName());
     }
@@ -61,7 +62,7 @@ public FieldItem encodeByte(PlcField field, Object[] values) {
     public FieldItem encodeShort(PlcField field, Object[] values) {
         TestField testField = (TestField) field;
         if(testField.getDataType() == Short.class) {
-            return new DefaultIntegerFieldItem((Long[]) values);
+            return new DefaultIntegerFieldItem(Arrays.stream(values).map(x -> new Long((Short) x)).toArray(Long[]::new));
         }
         throw new PlcRuntimeException("Invalid encoder for type " + testField.getDataType().getName());
     }
@@ -70,7 +71,7 @@ public FieldItem encodeShort(PlcField field, Object[] values) {
     public FieldItem encodeInteger(PlcField field, Object[] values) {
         TestField testField = (TestField) field;
         if(testField.getDataType() == Integer.class) {
-            return new DefaultIntegerFieldItem((Long[]) values);
+            return new DefaultIntegerFieldItem(Arrays.stream(values).map(x -> new Long((Integer) x)).toArray(Long[]::new));
         }
         throw new PlcRuntimeException("Invalid encoder for type " + testField.getDataType().getName());
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services