You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/08/29 11:56:05 UTC

[incubator-plc4x] branch feature/api-redesign-chris-c updated: Some further fine-tuning after porting the examples and the edgent-integration

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git


The following commit(s) were added to refs/heads/feature/api-redesign-chris-c by this push:
     new 14bed12  Some further fine-tuning after porting the examples and the edgent-integration
14bed12 is described below

commit 14bed124cbd133607ef59c0f8e8c7b8a52001451
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Wed Aug 29 13:55:58 2018 +0200

    Some further fine-tuning after porting the examples and the edgent-integration
---
 .../azure/iothub/S7PlcToAzureIoTHubSample.java     |  34 +-
 .../dummydriver/connection/DummyConnection.java    |  12 +
 .../java/examples/kafkabridge/KafkaBridge.java     |  41 +--
 .../apache/plc4x/edgent/PlcConnectionAdapter.java  | 315 ++++++++++++------
 .../java/org/apache/plc4x/edgent/PlcFunctions.java | 360 +++++++++++++--------
 .../plc4x/edgent/PlcConnectionAdapterTest.java     |  26 +-
 .../org/apache/plc4x/edgent/PlcFunctionsTest.java  |   4 +-
 .../apache/plc4x/edgent/mock/MockConnection.java   |  12 +-
 .../plc4x/java/api/connection/PlcWriter.java       |   3 +-
 .../plc4x/java/api/messages/PlcReadResponse.java   |  14 +
 .../plc4x/java/api/messages/PlcWriteRequest.java   |  26 +-
 .../plc4x/java/api/types/PlcClientDatatype.java    |  37 +++
 12 files changed, 562 insertions(+), 322 deletions(-)

diff --git a/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java b/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
index 7c399e0..4e6830f 100644
--- a/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
+++ b/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java
@@ -24,9 +24,8 @@ import com.microsoft.azure.sdk.iot.device.Message;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
-import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,35 +35,45 @@ public class S7PlcToAzureIoTHubSample {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(S7PlcToAzureIoTHubSample.class);
 
+    private static final String FIELD_NAME = "value";
+
     /**
      * Example code do demonstrate sending events from an S7 device to Microsoft Azure IoT Hub
      *
-     * @param args Expected: [plc4x connection string, plc4x address, IoT-Hub connection string].
+     * @param args Expected: [plc4x connection string, plc4x field address, IoT-Hub connection string].
      */
     public static void main(String[] args) throws Exception {
+        if(args.length != 3) {
+            System.out.println("Usage: S7PlcToAzureIoTHubSample " +
+                "{plc4x-connection-string} {plc4x-field-address} {iot-hub-connection-string}");
+            return;
+        }
+
         String plc4xConnectionString = args[0];
         String addressString = args[1];
         String iotConnectionString = args[2];
         LOGGER.info("Connecting {}, {}, {}", plc4xConnectionString, addressString, iotConnectionString);
+
+        // Open a connection to the remote PLC.
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection(plc4xConnectionString)) {
             LOGGER.info("Connected");
 
+            // Open a connection to the cloud service.
             DeviceClient client = new DeviceClient(iotConnectionString, IotHubClientProtocol.MQTT);
             client.open();
 
+            // Get a reader instance.
             PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalStateException::new);
 
-            PlcField outputs = plcConnection.prepareField(addressString);
+            // Prepare a read request.
+            PlcReadRequest request = plcReader.readRequestBuilder().addItem(FIELD_NAME, addressString).build();
 
             while (!Thread.currentThread().isInterrupted()) {
                 // Simulate telemetry.
-                TypeSafePlcReadResponse<Byte> plcReadResponse = plcReader.read(
-                    new TypeSafePlcReadRequest<>(Byte.class, outputs)).get();
-
-                plcReadResponse.getResponseItems().stream()
-                    .flatMap(readResponseItem -> readResponseItem.getValues().stream())
-                    .forEach(byteValue -> {
-                            String result = Long.toBinaryString(byteValue.longValue());
+                PlcReadResponse response = plcReader.read(request).get();
+                response.getAllLongs(FIELD_NAME)
+                    .forEach(longValue -> {
+                            String result = Long.toBinaryString(longValue);
                             LOGGER.info("Outputs {}", result);
                             Message msg = new Message("{ \"bits\" : \"" + result + "\"}");
 
@@ -73,6 +82,7 @@ public class S7PlcToAzureIoTHubSample {
                         }
                     );
 
+                // Wait a second.
                 TimeUnit.SECONDS.sleep(1);
             }
 
diff --git a/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java b/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java
index 36c6e5b..213ee3d 100644
--- a/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java
+++ b/examples/dummy-driver/src/main/java/org/apache/plc4x/java/examples/dummydriver/connection/DummyConnection.java
@@ -65,6 +65,12 @@ public class DummyConnection extends AbstractPlcConnection implements PlcReader,
     }
 
     @Override
+    public PlcReadRequest.Builder readRequestBuilder() {
+        // TODO: Implement this ...
+        return null;
+    }
+
+    @Override
     public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         CompletableFuture<PlcReadResponse> readFuture = new CompletableFuture<>();
         PlcRequestContainer<PlcReadRequest, PlcReadResponse> container =
@@ -74,6 +80,12 @@ public class DummyConnection extends AbstractPlcConnection implements PlcReader,
     }
 
     @Override
+    public PlcWriteRequest.Builder writeRequestBuilder() {
+        // TODO: Implement this ...
+        return null;
+    }
+
+    @Override
     public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
         CompletableFuture<PlcWriteResponse> writeFuture = new CompletableFuture<>();
         PlcRequestContainer<PlcWriteRequest, PlcWriteResponse> container =
diff --git a/examples/kafka-bridge/src/main/java/org/apache/plc4x/java/examples/kafkabridge/KafkaBridge.java b/examples/kafka-bridge/src/main/java/org/apache/plc4x/java/examples/kafkabridge/KafkaBridge.java
index 509687e..2c8df14 100644
--- a/examples/kafka-bridge/src/main/java/org/apache/plc4x/java/examples/kafkabridge/KafkaBridge.java
+++ b/examples/kafka-bridge/src/main/java/org/apache/plc4x/java/examples/kafkabridge/KafkaBridge.java
@@ -35,13 +35,9 @@ import org.apache.edgent.topology.TStream;
 import org.apache.edgent.topology.Topology;
 import org.apache.plc4x.edgent.PlcConnectionAdapter;
 import org.apache.plc4x.edgent.PlcFunctions;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.api.messages.items.PlcReadRequestItem;
-import org.apache.plc4x.java.api.messages.items.PlcReadResponseItem;
-import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.examples.kafkabridge.model.PlcFieldConfig;
 import org.apache.plc4x.java.examples.kafkabridge.model.Configuration;
 import org.apache.plc4x.java.examples.kafkabridge.model.PlcMemoryBlock;
@@ -79,26 +75,14 @@ public class KafkaBridge {
         Topology top = dp.newTopology("kafka-bridge");
 
         // Build the entire request.
-        Map<PlcReadRequestItem, String> names = new HashMap<>();
-        PlcReadRequest readRequest = new PlcReadRequest();
+        PlcReadRequest.Builder builder = plcAdapter.readRequestBuilder();
         for(PlcMemoryBlock plcMemoryBlock : config.getPlcConfig().getPlcMemoryBlocks()) {
             for (PlcFieldConfig address : config.getPlcConfig().getPlcFields()) {
-                try {
-                    PlcField field = plcAdapter.prepareField(
-                            "DATA_BLOCKS/" + plcMemoryBlock.getAddress() + "/" + address.getAddress());
-                    PlcReadRequestItem readItem = new PlcReadRequestItem<>(address.getType(), field,
-                            +address.getSize());
-                    readRequest.addItem(readItem);
-                    names.put(readItem, plcMemoryBlock.getName() + "/" + address.getName());
-                } catch (PlcConnectionException e) {
-                    logger.error("Error connecting to remote", e);
-                    throw e;
-                } catch (PlcException e) {
-                    logger.error("Error parsing address {}", address.getAddress(), e);
-                    throw e;
-                }
+                builder = builder.addItem(plcMemoryBlock.getName() + "/" + address.getName(),
+                    "DATA_BLOCKS/" + plcMemoryBlock.getAddress() + "/" + address.getAddress());
             }
         }
+        PlcReadRequest readRequest = builder.build();
 
         // Create a supplier that is able to read the batch we just created.
         Supplier<PlcReadResponse> plcSupplier = PlcFunctions.batchSupplier(plcAdapter, readRequest);
@@ -109,18 +93,15 @@ public class KafkaBridge {
         // Convert the byte into a string.
         TStream<String> jsonSource = source.map(value -> {
             JsonObject jsonObject = new JsonObject();
-            for (PlcReadResponseItem<?> readResponseItem : value.getResponseItems()) {
-                String name = names.get(readResponseItem.getRequestItem());
-                if(readResponseItem.getValues().size() == 1) {
-                    jsonObject.addProperty(name, Byte.toString((Byte) readResponseItem.getValues().get(0)));
-                } else if (readResponseItem.getValues().size() > 1) {
+            value.getFieldNames().forEach(fieldName -> {
+                if(value.getNumValues(fieldName) == 1) {
+                    jsonObject.addProperty(fieldName, Byte.toString(value.getByte(fieldName)));
+                } else if (value.getNumValues(fieldName) > 1) {
                     JsonArray values = new JsonArray();
-                    for (Object valueElement : readResponseItem.getValues()) {
-                        values.add((Byte) valueElement);
-                    }
-                    jsonObject.add(name, values);
+                    value.getAllBytes(fieldName).forEach(values::add);
+                    jsonObject.add(fieldName, values);
                 }
-            }
+            });
             return jsonObject.toString();
         });
 
diff --git a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
index 41a1761..06975f3 100644
--- a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
+++ b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java
@@ -18,8 +18,7 @@ under the License.
 */
 package org.apache.plc4x.edgent;
 
-import java.util.Calendar;
-
+import com.google.gson.JsonObject;
 import org.apache.edgent.function.Consumer;
 import org.apache.edgent.function.Function;
 import org.apache.edgent.function.Supplier;
@@ -31,13 +30,14 @@ import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
 import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcClientDatatype;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.JsonObject;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 
 /**
  * PlcConnectionAdapter encapsulates a plc4x {@link PlcConnection}.
@@ -65,17 +65,18 @@ import com.google.gson.JsonObject;
 public class PlcConnectionAdapter implements AutoCloseable {
 
     private static final Logger logger = LoggerFactory.getLogger(PlcConnectionAdapter.class);
-    private static final Class<?>[] allowedDataTypes = new Class[]{Boolean.class, Byte.class, Short.class, Integer.class, Float.class,  String.class, Calendar.class};
-    
+
+    private static final String FIELD_NAME = "default";
+
     private String plcConnectionUrl;
     private PlcConnection plcConnection;
 
     /*
-   * NOTES:
-   * - if we get to the point of the application needing some feedback (possibly control)
-   *   of read or write errors, my thinking is to enhance the PlcConnectionAdapter
-   *   to enable the app to register an error callback handler or such.
-   */
+     * NOTES:
+     * - if we get to the point of the application needing some feedback (possibly control)
+     *   of read or write errors, my thinking is to enhance the PlcConnectionAdapter
+     *   to enable the app to register an error callback handler or such.
+     */
 
     public PlcConnectionAdapter(PlcConnection plcConnection) {
         this.plcConnection = plcConnection;
@@ -102,44 +103,9 @@ public class PlcConnectionAdapter implements AutoCloseable {
         }
     }
 
-    public PlcField prepareField(String fieldString) throws PlcException {
-        return getConnection().prepareField(fieldString);
-    }
-
-    <T> Supplier<T> newSupplier(Class<T> datatype, String fieldString) {
-        PlcConnectionAdapter.checkDatatype(datatype);
-        // satisfy sonar's "Reduce number of anonymous class lines" code smell
-        return new MySupplier<>(datatype, fieldString);
-    }
-    
-    private class MySupplier<T> implements Supplier<T> {
-        private static final long serialVersionUID = 1L;
-        private Class<T> datatype;
-        private String addressStr;
-      
-        MySupplier(Class<T> datatype, String addressStr) {
-            this.datatype = datatype;
-            this.addressStr = addressStr;
-        }
-
-        @Override
-        public T get() {
-            PlcConnection connection = null;
-            PlcField field = null;
-            try {
-                connection = getConnection();
-                field = connection.prepareField(addressStr);
-                PlcReader reader = connection.getReader()
-                  .orElseThrow(() -> new NullPointerException("No reader available"));
-                TypeSafePlcReadRequest<T> readRequest = PlcConnectionAdapter.newPlcReadRequest(datatype, field);
-                return reader.read(readRequest).get().getResponseItem()
-                  .orElseThrow(() -> new IllegalStateException("No response available"))
-                  .getValues().get(0);
-            } catch (Exception e) {
-                logger.error("reading from plc device {} {} failed", connection, field, e);
-                return null;
-            }
-        }
+    public PlcReadRequest.Builder readRequestBuilder() throws PlcException {
+        return getConnection().getReader().orElseThrow(
+            () -> new PlcException("This connection doesn't support reading")).readRequestBuilder();
     }
 
     Supplier<PlcReadResponse> newSupplier(PlcReadRequest readRequest) {
@@ -152,7 +118,7 @@ public class PlcConnectionAdapter implements AutoCloseable {
                 try {
                     connection = getConnection();
                     PlcReader reader = connection.getReader()
-                        .orElseThrow(() -> new NullPointerException("No reader available"));
+                        .orElseThrow(() -> new PlcException("This connection doesn't support reading"));
                     return reader.read(readRequest).get();
                 } catch (Exception e) {
                     logger.error("reading from plc device {} {} failed", connection, readRequest, e);
@@ -162,72 +128,217 @@ public class PlcConnectionAdapter implements AutoCloseable {
         };
     }
 
-    <T> Consumer<T> newConsumer(Class<T> datatype, String addressStr) {
-        PlcConnectionAdapter.checkDatatype(datatype);
-        return new Consumer<T>() {
-            private static final long serialVersionUID = 1L;
+    <T> Supplier<T> newSupplier(Class<T> genericDatatype, PlcClientDatatype clientDatatype, String fieldQuery) {
+        // satisfy sonar's "Reduce number of anonymous class lines" code smell
+        return new MySupplier<>(genericDatatype, clientDatatype, fieldQuery);
+    }
 
-            @Override
-            public void accept(T arg0) {
-                PlcConnection connection = null;
-                PlcField field = null;
-                try {
-                    connection = getConnection();
-                    field = connection.prepareField(addressStr);
-                    PlcWriter writer = connection.getWriter()
-                        .orElseThrow(() -> new NullPointerException("No writer available"));
-                    PlcWriteRequest writeReq = PlcConnectionAdapter.newPlcWriteRequest(field, arg0);
-                    writer.write(writeReq).get();
-                } catch (Exception e) {
-                    logger.error("writing to plc device {} {} failed", connection, field, e);
+    private class MySupplier<T> implements Supplier<T> {
+
+        private static final long serialVersionUID = 1L;
+
+        private Class<T> genericDatatype;
+        private PlcClientDatatype clientDatatype;
+        private String fieldQuery;
+
+        MySupplier(Class<T> genericDatatype, PlcClientDatatype clientDatatype, String fieldQuery) {
+            this.genericDatatype = genericDatatype;
+            this.clientDatatype = clientDatatype;
+            this.fieldQuery = fieldQuery;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public T get() {
+            PlcConnection connection = null;
+            PlcField field = null;
+            try {
+                connection = getConnection();
+                field = connection.prepareField(fieldQuery);
+                PlcReader reader = connection.getReader()
+                    .orElseThrow(() -> new PlcException("This connection doesn't support reading"));
+                PlcReadRequest readRequest = reader.readRequestBuilder().addItem(FIELD_NAME, fieldQuery).build();
+                PlcReadResponse readResponse = reader.read(readRequest).get();
+                Object value = null;
+                switch (clientDatatype) {
+                    case RAW:
+                        value = readResponse.getRaw(FIELD_NAME);
+                        break;
+                    case OBJECT:
+                        value = readResponse.getObject(FIELD_NAME);
+                        break;
+                    case BYTE:
+                        value = readResponse.getByte(FIELD_NAME);
+                        break;
+                    case SHORT:
+                        value = readResponse.getShort(FIELD_NAME);
+                        break;
+                    case INTEGER:
+                        value = readResponse.getInteger(FIELD_NAME);
+                        break;
+                    case LONG:
+                        value = readResponse.getLong(FIELD_NAME);
+                        break;
+                    case FLOAT:
+                        value = readResponse.getFloat(FIELD_NAME);
+                        break;
+                    case DOUBLE:
+                        value = readResponse.getDouble(FIELD_NAME);
+                        break;
+                    case STRING:
+                        value = readResponse.getString(FIELD_NAME);
+                        break;
+                    case TIME:
+                        value = readResponse.getTime(FIELD_NAME);
+                        break;
+                    case DATE:
+                        value = readResponse.getDate(FIELD_NAME);
+                        break;
+                    case DATE_TIME:
+                        value = readResponse.getDateTime(FIELD_NAME);
+                        break;
+                }
+                if (value != null) {
+                    if (genericDatatype.isAssignableFrom(value.getClass())) {
+                        return (T) value;
+                    } else {
+                        logger.error("types don't match {} should be of type {}", value.getClass(), genericDatatype);
+                    }
                 }
+            } catch (Exception e) {
+                logger.error("reading from plc device {} {} failed", connection, field, e);
             }
+            return null;
+        }
+    }
 
-        };
+    <T> Consumer<T> newJsonConsumer(Class<T> genericDatatype, PlcClientDatatype clientDatatype, String fieldQuery) {
+        return new ObjectConsumer<>(genericDatatype, clientDatatype, fieldQuery);
     }
 
-    <T> Consumer<JsonObject> newConsumer(Class<T> datatype, Function<JsonObject, String> addressFn, Function<JsonObject, T> valueFn) {
-        PlcConnectionAdapter.checkDatatype(datatype);
-        return new Consumer<JsonObject>() {
-            private static final long serialVersionUID = 1L;
+    <T> Consumer<JsonObject> newJsonConsumer(PlcClientDatatype clientDatatype, Function<JsonObject, String> fieldQueryFn, Function<JsonObject, T> fieldValueFn) {
+        return new JsonConsumer<>(clientDatatype, fieldQueryFn, fieldValueFn);
+    }
 
-            @Override
-            public void accept(JsonObject jo) {
-                PlcConnection connection = null;
-                PlcField field = null;
-                try {
-                    connection = getConnection();
-                    String addressStr = addressFn.apply(jo);
-                    field = connection.prepareField(addressStr);
-                    T value = valueFn.apply(jo);
-                    PlcWriter writer = connection.getWriter()
-                        .orElseThrow(() -> new NullPointerException("No writer available"));
-                    PlcWriteRequest writeReq = newPlcWriteRequest(field, value);
-                    writer.write(writeReq).get();
-                } catch (Exception e) {
-                    logger.error("writing to plc device {} {} failed", connection, field, e);
-                }
+    private abstract class BaseConsumer<T> implements Consumer<T> {
+
+        protected void write(PlcClientDatatype clientDatatype, String fieldQuery, Object fieldValue) {
+            PlcConnection connection = null;
+            try {
+                connection = getConnection();
+                PlcWriter writer = connection.getWriter()
+                    .orElseThrow(() -> new PlcException("This connection doesn't support writing"));
+                PlcWriteRequest.Builder builder = writer.writeRequestBuilder();
+                PlcWriteRequest writeRequest = builder.build();
+                addItem(builder, clientDatatype, fieldQuery, fieldValue);
+                writer.write(writeRequest).get();
+            } catch (Exception e) {
+                logger.error("writing to plc device {} {} failed", connection, fieldQuery, e);
             }
 
-        };
-    }
+        }
 
-    static void checkDatatype(Class<?> cls) {
-        for (Class<?> check: allowedDataTypes) {
-            if (check == cls)
-                return;
+        private void addItem(PlcWriteRequest.Builder builder,
+                               PlcClientDatatype clientDatatype, String fieldQuery, Object fieldValue) {
+            switch (clientDatatype) {
+                case RAW:
+                    if (fieldValue instanceof byte[]) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (byte[]) fieldValue);
+                    }
+                    break;
+                case OBJECT:
+                    builder.addItem(FIELD_NAME, fieldQuery, fieldValue);
+                    break;
+                case BYTE:
+                    if (fieldValue instanceof Byte) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (Byte) fieldValue);
+                    }
+                    break;
+                case SHORT:
+                    if (fieldValue instanceof Short) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (Short) fieldValue);
+                    }
+                    break;
+                case INTEGER:
+                    if (fieldValue instanceof Integer) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (Integer) fieldValue);
+                    }
+                    break;
+                case LONG:
+                    if (fieldValue instanceof Long) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (Long) fieldValue);
+                    }
+                    break;
+                case FLOAT:
+                    if (fieldValue instanceof Float) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (Float) fieldValue);
+                    }
+                    break;
+                case DOUBLE:
+                    if (fieldValue instanceof Double) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (Double) fieldValue);
+                    }
+                    break;
+                case STRING:
+                    if (fieldValue instanceof String) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (String) fieldValue);
+                    }
+                    break;
+                case TIME:
+                    if (fieldValue instanceof LocalTime) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (LocalTime) fieldValue);
+                    }
+                    break;
+                case DATE:
+                    if (fieldValue instanceof LocalDate) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (LocalDate) fieldValue);
+                    }
+                    break;
+                case DATE_TIME:
+                    if (fieldValue instanceof LocalDateTime) {
+                        builder.addItem(FIELD_NAME, fieldQuery, (LocalDateTime) fieldValue);
+                    }
+                    break;
+            }
         }
-        throw new IllegalArgumentException("Not a legal plc data type: " + cls.getSimpleName());
     }
 
-    @SuppressWarnings("unchecked")
-    static <T> TypeSafePlcWriteRequest<T> newPlcWriteRequest(PlcField field, T value) {
-        Class<T> cls = (Class<T>) value.getClass();
-        return new TypeSafePlcWriteRequest<>(cls, field, value);
+    private class ObjectConsumer<T> extends  BaseConsumer<T> {
+        private static final long serialVersionUID = 1L;
+
+        private PlcClientDatatype clientDatatype;
+        private String fieldQuery;
+
+        ObjectConsumer(Class<T> genericDatatype, PlcClientDatatype clientDatatype, String fieldQuery) {
+            this.clientDatatype = clientDatatype;
+            this.fieldQuery = fieldQuery;
+        }
+
+        @Override
+        public void accept(Object fieldValue) {
+            write(clientDatatype, fieldQuery, fieldValue);
+        }
     }
 
-    static <T> TypeSafePlcReadRequest<T> newPlcReadRequest(Class<T> datatype, PlcField field) {
-        return new TypeSafePlcReadRequest<>(datatype, field);
+    private class JsonConsumer<T> extends BaseConsumer<JsonObject> {
+        private static final long serialVersionUID = 1L;
+
+        private PlcClientDatatype clientDatatype;
+        private Function<JsonObject, String> fieldQueryFn;
+        private Function<JsonObject, T> fieldValueFn;
+
+        JsonConsumer(PlcClientDatatype clientDatatype, Function<JsonObject, String> fieldQueryFn, Function<JsonObject, T> fieldValueFn) {
+            this.clientDatatype = clientDatatype;
+            this.fieldQueryFn = fieldQueryFn;
+            this.fieldValueFn = fieldValueFn;
+        }
+
+        @Override
+        public void accept(JsonObject jsonObject) {
+            String fieldQuery = fieldQueryFn.apply(jsonObject);
+            Object fieldValue = fieldValueFn.apply(jsonObject);
+            write(clientDatatype, fieldQuery, fieldValue);
+        }
     }
 
 }
diff --git a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcFunctions.java b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcFunctions.java
index 526c805..83a9ddc 100644
--- a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcFunctions.java
+++ b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcFunctions.java
@@ -18,183 +18,267 @@ under the License.
 */
 package org.apache.plc4x.edgent;
 
-import java.util.Calendar;
-
+import com.google.gson.JsonObject;
 import org.apache.edgent.function.Consumer;
 import org.apache.edgent.function.Function;
 import org.apache.edgent.function.Supplier;
-
-import com.google.gson.JsonObject;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.types.PlcClientDatatype;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 
 /**
  * WIP - A plc4x Apache Edgent {@link Supplier} and {@link Consumer} connector factory.
  * <p>
  * TODO:
- * Are there cases where a single logical poll would want to read from 
+ * Are there cases where a single logical poll would want to read from
  * multiple addrs/data (of different types) from a device and bundle the values
  * into a single TStream tuple (e.g., a JsonObject)?  How would we support that?
  * Is there a similar need for writing to multiple addrs/values on a device?
  * Ah... NOTE: plc4c "batch" requests are coming and will help to address this.
- * 
+ *
  * <p>
  * Sample use to read plc device data into an Edgent TStream:
  * <pre>{@code
  * PlcConnection plcConnection = new PlcDriverManager().getConnection("s7://192.168.0.1/0/0");
  * plcConnection.connect();
  * PlcConnectionAdapter adapter = new PlcConnectionAdapter(plcConnection));
- * 
+ *
  * DirectProvider dp = new DirectProvider();
  * Topology top = dp.newTopology();
  * TStream<Byte> stream = top.poll(PlcFunctions.byteSupplier(adapter, "INPUTS/0", 1, TimeUnit.SECONDS);
  * stream.print();
  * dp.submit(top);
  * }</pre>
- * 
+ * <p>
  * Sample use to write Edgent TStream data to a plc device:
  * <pre>{@code
  * PlcConnection plcConnection = new PlcDriverManager().getConnection("s7://192.168.0.1/0/0");
  * plcConnection.connect();
  * PlcConnectionAdapter adapter = new PlcConnectionAdapter(plcConnection);
- * 
+ *
  * DirectProvider dp = new DirectProvider();
  * Topology top = dp.newTopology();
- * 
+ *
  * TStream<Byte> stream = ...
  * stream.print();
  * TSink<Byte> sink = stream.sink(PlcFunctions.byteConsumer(adapter, "OUTPUTS/0"));
- * 
+ *
  * dp.submit(top);
  * }</pre>
- * 
  */
 public class PlcFunctions {
 
-  private PlcFunctions() {
-    throw new IllegalStateException("Utility class!");
-  }
-
-  /**
-   * Create a new Edgent {@link Supplier} to read data from the 
-   * plc device.
-   * <p>
-   * Every call to the returned {@link Supplier#get()} reads a
-   * new data value from the plc device address and connection
-   * associated with the {@code PlcConnectionAdapter}.
-   * <p>
-   * 
-   * @param adapter the @{link PlcConnectionAdapter}
-   * @param addressStr the plc device address string
-   * @return the {@code Supplier<T>}
-   *
-   * // TODO: No need to import the Topology module for just this comment ... I think
-   * //see org.apache.edgent.topology.Topology#poll(Supplier, long, java.util.concurrent.TimeUnit)
-   */
-  public static Supplier<Boolean> booleanSupplier(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newSupplier(Boolean.class, addressStr);
-  }
-  public static Supplier<Byte> byteSupplier(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newSupplier(Byte.class, addressStr);
-  }
-  public static Supplier<Short> shortSupplier(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newSupplier(Short.class, addressStr);
-  }
-  public static Supplier<Integer> integerSupplier(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newSupplier(Integer.class, addressStr);
-  }
-  public static Supplier<Float> floatSupplier(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newSupplier(Float.class, addressStr);
-  }
-  public static Supplier<String> stringSupplier(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newSupplier(String.class, addressStr);
-  }
-  public static Supplier<Calendar> calendarSupplier(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newSupplier(Calendar.class, addressStr);
-  }
-  public static Supplier<PlcReadResponse> batchSupplier(PlcConnectionAdapter adapter, PlcReadRequest readRequest) {
-    return adapter.newSupplier(readRequest);
-  }
-
-  /**
-   * Create a new Edgent {@link Consumer} to write data to the 
-   * plc device.
-   * <p>
-   * Every call to the returned {@link Consumer#accept(Object)}
-   * writes the value to the the device address and connection
-   * associated with the {@code PlcConnectionAdapter}.
-   * 
-   * @param adapter the @{link PlcConnectionAdapter}
-   * @param addressStr the plc device address string
-   * @return the {@code Consumer<T>}
-   *
-   * // TODO: No need to import the Topology module for just this comment ... I think
-   * //see org.apache.edgent.topology.TStream#sink(Consumer)
-   */
-  public static Consumer<Boolean> booleanConsumer(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newConsumer(Boolean.class, addressStr);
-  }
-  public static Consumer<Byte> byteConsumer(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newConsumer(Byte.class, addressStr);
-  }
-  public static Consumer<Short> shortConsumer(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newConsumer(Short.class, addressStr);
-  }
-  public static Consumer<Integer> integerConsumer(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newConsumer(Integer.class, addressStr);
-  }
-  public static Consumer<Float> floatConsumer(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newConsumer(Float.class, addressStr);
-  }
-  public static Consumer<String> stringConsumer(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newConsumer(String.class, addressStr);
-  }
-  public static Consumer<Calendar> calendarConsumer(PlcConnectionAdapter adapter, String addressStr) {
-    return adapter.newConsumer(Calendar.class, addressStr);
-  }
-
-  /**
-   * Create a new Edgent {@link Consumer} to write data to the 
-   * plc device.
-   * <p>
-   * TODO: Is it premature to supply this?
-   * <p>
-   * Every call to the returned {@link Consumer#accept(Object)}
-   * <ul>
-   * <li>calls {@code addressFn} to get the device address string</li>
-   * <li>calls {@code valueFn} to get the {@code T} to write</li>
-   * <li>writes the value to the device address using the connection
-   * associated with the {@code PlcConnectionAdapter}.</li>
-   * </ul>
-   * 
-   * @param adapter the @{link PlcConnectionAdapter}
-   * @param addressFn {@code Function} the returns a device {@code PlcField} from a {@code JsonObject}
-   * @param valueFn {@code Function} the returns a {@code Value} from a {@code JsonObject}
-   * @return the {@code Consumer<JsonObject>}
-   *
-   * // TODO: No need to import the Topology module for just this comment ... I think
-   * //see org.apache.edgent.topology.TStream#sink(Consumer)
-   */
-  public static Consumer<JsonObject> booleanConsumer(PlcConnectionAdapter adapter, Function<JsonObject,String> addressFn, Function<JsonObject,Boolean> valueFn) {
-    return adapter.newConsumer(Boolean.class, addressFn, valueFn);
-  }
-  public static Consumer<JsonObject> byteConsumer(PlcConnectionAdapter adapter, Function<JsonObject,String> addressFn, Function<JsonObject,Byte> valueFn) {
-    return adapter.newConsumer(Byte.class, addressFn, valueFn);
-  }
-  public static Consumer<JsonObject> shortConsumer(PlcConnectionAdapter adapter, Function<JsonObject,String> addressFn, Function<JsonObject,Short> valueFn) {
-    return adapter.newConsumer(Short.class, addressFn, valueFn);
-  }
-  public static Consumer<JsonObject> integerConsumer(PlcConnectionAdapter adapter, Function<JsonObject,String> addressFn, Function<JsonObject,Integer> valueFn) {
-    return adapter.newConsumer(Integer.class, addressFn, valueFn);
-  }
-  public static Consumer<JsonObject> floatConsumer(PlcConnectionAdapter adapter, Function<JsonObject,String> addressFn, Function<JsonObject,Float> valueFn) {
-    return adapter.newConsumer(Float.class, addressFn, valueFn);
-  }
-  public static Consumer<JsonObject> stringConsumer(PlcConnectionAdapter adapter, Function<JsonObject,String> addressFn, Function<JsonObject,String> valueFn) {
-    return adapter.newConsumer(String.class, addressFn, valueFn);
-  }
-  public static Consumer<JsonObject> calendarConsumer(PlcConnectionAdapter adapter, Function<JsonObject,String> addressFn, Function<JsonObject,Calendar> valueFn) {
-    return adapter.newConsumer(Calendar.class, addressFn, valueFn);
-  }
-  
+    private PlcFunctions() {
+        throw new IllegalStateException("Utility class!");
+    }
+
+    /**
+     * Create a new Edgent {@link Supplier} to read data from the
+     * plc device.
+     * <p>
+     * Every call to the returned {@link Supplier#get()} reads a
+     * new data value from the plc device address and connection
+     * associated with the {@code PlcConnectionAdapter}.
+     * <p>
+     *
+     * @param adapter    the @{link PlcConnectionAdapter}
+     * @param addressStr the plc device address string
+     * @return the {@code Supplier<T>}
+     * <p>
+     * // TODO: No need to import the Topology module for just this comment ... I think
+     * //see org.apache.edgent.topology.Topology#poll(Supplier, long, java.util.concurrent.TimeUnit)
+     */
+    public static Supplier<byte[]> rawSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(byte[].class, PlcClientDatatype.RAW, addressStr);
+    }
+
+    public static Supplier<Object> objectSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(Object.class, PlcClientDatatype.OBJECT, addressStr);
+    }
+
+    public static Supplier<Boolean> booleanSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(Boolean.class, PlcClientDatatype.BOOLEAN, addressStr);
+    }
+
+    public static Supplier<Byte> byteSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(Byte.class, PlcClientDatatype.BYTE, addressStr);
+    }
+
+    public static Supplier<Short> shortSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(Short.class, PlcClientDatatype.SHORT, addressStr);
+    }
+
+    public static Supplier<Integer> integerSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(Integer.class, PlcClientDatatype.INTEGER, addressStr);
+    }
+
+    public static Supplier<Long> longSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(Long.class, PlcClientDatatype.LONG, addressStr);
+    }
+
+    public static Supplier<Float> floatSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(Float.class, PlcClientDatatype.FLOAT, addressStr);
+    }
+
+    public static Supplier<Double> doubleSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(Double.class, PlcClientDatatype.DOUBLE, addressStr);
+    }
+
+    public static Supplier<String> stringSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(String.class, PlcClientDatatype.STRING, addressStr);
+    }
+
+    public static Supplier<LocalTime> timeSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(LocalTime.class, PlcClientDatatype.TIME, addressStr);
+    }
+
+    public static Supplier<LocalDate> dateSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(LocalDate.class, PlcClientDatatype.DATE, addressStr);
+    }
+
+    public static Supplier<LocalDateTime> dateTimeSupplier(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newSupplier(LocalDateTime.class, PlcClientDatatype.DATE_TIME, addressStr);
+    }
+
+    public static Supplier<PlcReadResponse> batchSupplier(PlcConnectionAdapter adapter, PlcReadRequest readRequest) {
+        return adapter.newSupplier(readRequest);
+    }
+
+    /**
+     * Create a new Edgent {@link Consumer} to write data to the
+     * plc device.
+     * <p>
+     * Every call to the returned {@link Consumer#accept(Object)}
+     * writes the value to the the device address and connection
+     * associated with the {@code PlcConnectionAdapter}.
+     *
+     * @param adapter    the @{link PlcConnectionAdapter}
+     * @param addressStr the plc device address string
+     * @return the {@code Consumer<T>}
+     * <p>
+     * // TODO: No need to import the Topology module for just this comment ... I think
+     * //see org.apache.edgent.topology.TStream#sink(Consumer)
+     */
+    public static Consumer<byte[]> rawConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(byte[].class, PlcClientDatatype.RAW, addressStr);
+    }
+
+    public static Consumer<Object> objectConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(Object.class, PlcClientDatatype.OBJECT, addressStr);
+    }
+
+    public static Consumer<Boolean> booleanConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(Boolean.class, PlcClientDatatype.BOOLEAN, addressStr);
+    }
+
+    public static Consumer<Byte> byteConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(Byte.class, PlcClientDatatype.BYTE, addressStr);
+    }
+
+    public static Consumer<Short> shortConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(Short.class, PlcClientDatatype.SHORT, addressStr);
+    }
+
+    public static Consumer<Integer> integerConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(Integer.class, PlcClientDatatype.INTEGER, addressStr);
+    }
+
+    public static Consumer<Long> longConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(Long.class, PlcClientDatatype.LONG, addressStr);
+    }
+
+    public static Consumer<Float> floatConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(Float.class, PlcClientDatatype.FLOAT, addressStr);
+    }
+
+    public static Consumer<Double> doubleConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(Double.class, PlcClientDatatype.DOUBLE, addressStr);
+    }
+
+    public static Consumer<String> stringConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(String.class, PlcClientDatatype.STRING, addressStr);
+    }
+
+    public static Consumer<LocalTime> timeConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(LocalTime.class, PlcClientDatatype.TIME, addressStr);
+    }
+
+    public static Consumer<LocalDate> dateConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(LocalDate.class, PlcClientDatatype.DATE, addressStr);
+    }
+
+    public static Consumer<LocalDateTime> dateTimeConsumer(PlcConnectionAdapter adapter, String addressStr) {
+        return adapter.newJsonConsumer(LocalDateTime.class, PlcClientDatatype.DATE_TIME, addressStr);
+    }
+
+    /**
+     * Create a new Edgent {@link Consumer} to write data to the
+     * plc device.
+     * <p>
+     * TODO: Is it premature to supply this?
+     * <p>
+     * Every call to the returned {@link Consumer#accept(Object)}
+     * <ul>
+     * <li>calls {@code addressFn} to get the device address string</li>
+     * <li>calls {@code valueFn} to get the {@code T} to write</li>
+     * <li>writes the value to the device address using the connection
+     * associated with the {@code PlcConnectionAdapter}.</li>
+     * </ul>
+     *
+     * @param adapter   the @{link PlcConnectionAdapter}
+     * @param addressFn {@code Function} the returns a device {@code PlcField} from a {@code JsonObject}
+     * @param valueFn   {@code Function} the returns a {@code Value} from a {@code JsonObject}
+     * @return the {@code Consumer<JsonObject>}
+     * <p>
+     * // TODO: No need to import the Topology module for just this comment ... I think
+     * //see org.apache.edgent.topology.TStream#sink(Consumer)
+     */
+    public static Consumer<JsonObject> booleanConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, Boolean> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.BOOLEAN, addressFn, valueFn);
+    }
+
+    public static Consumer<JsonObject> byteConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, Byte> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.BYTE, addressFn, valueFn);
+    }
+
+    public static Consumer<JsonObject> shortConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, Short> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.SHORT, addressFn, valueFn);
+    }
+
+    public static Consumer<JsonObject> integerConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, Integer> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.INTEGER, addressFn, valueFn);
+    }
+
+    public static Consumer<JsonObject> longConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, Long> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.LONG, addressFn, valueFn);
+    }
+
+    public static Consumer<JsonObject> floatConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, Float> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.FLOAT, addressFn, valueFn);
+    }
+
+    public static Consumer<JsonObject> doubleConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, Double> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.DOUBLE, addressFn, valueFn);
+    }
+
+    public static Consumer<JsonObject> stringConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, String> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.STRING, addressFn, valueFn);
+    }
+
+    public static Consumer<JsonObject> timeConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, LocalTime> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.TIME, addressFn, valueFn);
+    }
+
+    public static Consumer<JsonObject> dateConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, LocalDate> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.DATE, addressFn, valueFn);
+    }
+
+    public static Consumer<JsonObject> dateTimeConsumer(PlcConnectionAdapter adapter, Function<JsonObject, String> addressFn, Function<JsonObject, LocalDateTime> valueFn) {
+        return adapter.newJsonConsumer(PlcClientDatatype.DATE_TIME, addressFn, valueFn);
+    }
+
 }
diff --git a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcConnectionAdapterTest.java b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcConnectionAdapterTest.java
index e8279f5..8272b7b 100644
--- a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcConnectionAdapterTest.java
+++ b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcConnectionAdapterTest.java
@@ -350,7 +350,7 @@ public class PlcConnectionAdapterTest {
         adapter.close();
     }
 
-    static <T> void checkSupplier(MockConnection connection, PlcField field, Supplier<T> supplier, Object... values) throws Exception {
+    static <T> void checkSupplier(MockConnection connection, PlcField field, Supplier<T> supplier, Object... values) {
         checkSupplier(0, connection, field, supplier, values);
     }
 
@@ -375,7 +375,7 @@ public class PlcConnectionAdapterTest {
     }
 
     /*
-     * test PlcConnectionAdapter.newConsumer(address)
+     * test PlcConnectionAdapter.newJsonConsumer(address)
      */
     @Test
     @Category(FastTests.class)
@@ -410,7 +410,7 @@ public class PlcConnectionAdapterTest {
     }
 
     /*
-     * test PlcConnectionAdapter.newConsumer(address) with write exception
+     * test PlcConnectionAdapter.newJsonConsumer(address) with write exception
      */
     @Test
     @Category(FastTests.class)
@@ -428,7 +428,7 @@ public class PlcConnectionAdapterTest {
         adapter.close();
     }
 
-    static <T> void checkConsumer(MockConnection connection, PlcField field, Consumer<T> consumer, Object... values) throws Exception {
+    static <T> void checkConsumer(MockConnection connection, PlcField field, Consumer<T> consumer, Object... values) {
         checkConsumer(0, connection, field, consumer, values);
     }
 
@@ -465,7 +465,7 @@ public class PlcConnectionAdapterTest {
     }
 
     /*
-     * test PlcConnectionAdapter.newConsumer(addressFn, valueFn)
+     * test PlcConnectionAdapter.newJsonConsumer(addressFn, valueFn)
      */
     @Test
     @Category(FastTests.class)
@@ -479,29 +479,29 @@ public class PlcConnectionAdapterTest {
 
         Function<JsonObject, String> addressFn = t -> t.get("address").getAsString();
 
-        consumer = adapter.newConsumer(Boolean.class, addressFn, t -> t.get("value").getAsBoolean());
+        consumer = adapter.newJsonConsumer(Boolean.class, addressFn, t -> t.get("value").getAsBoolean());
         checkConsumerJson(connection, address, consumer, true, false);
 
-        consumer = adapter.newConsumer(Byte.class, addressFn, t -> t.get("value").getAsByte());
+        consumer = adapter.newJsonConsumer(Byte.class, addressFn, t -> t.get("value").getAsByte());
         checkConsumerJson(connection, address, consumer, (byte) 0x1, (byte) 0x2, (byte) 0x3);
 
-        consumer = adapter.newConsumer(Short.class, addressFn, t -> t.get("value").getAsShort());
+        consumer = adapter.newJsonConsumer(Short.class, addressFn, t -> t.get("value").getAsShort());
         checkConsumerJson(connection, address, consumer, (short) 1, (short) 2, (short) 3);
 
-        consumer = adapter.newConsumer(Integer.class, addressFn, t -> t.get("value").getAsInt());
+        consumer = adapter.newJsonConsumer(Integer.class, addressFn, t -> t.get("value").getAsInt());
         checkConsumerJson(connection, address, consumer, 1000, 1001, 1002);
 
-        consumer = adapter.newConsumer(Float.class, addressFn, t -> t.get("value").getAsFloat());
+        consumer = adapter.newJsonConsumer(Float.class, addressFn, t -> t.get("value").getAsFloat());
         checkConsumerJson(connection, address, consumer, 1000.5f, 1001.5f, 1002.5f);
 
-        consumer = adapter.newConsumer(String.class, addressFn, t -> t.get("value").getAsString());
+        consumer = adapter.newJsonConsumer(String.class, addressFn, t -> t.get("value").getAsString());
         checkConsumerJson(connection, address, consumer, "one", "two", "three");
 
         adapter.close();
     }
 
     /*
-     * test PlcConnectionAdapter.newConsumer(addressFn, valueFn) with write failure
+     * test PlcConnectionAdapter.newJsonConsumer(addressFn, valueFn) with write failure
      */
     @Test
     @Category(FastTests.class)
@@ -515,7 +515,7 @@ public class PlcConnectionAdapterTest {
 
         Function<JsonObject, String> addressFn = t -> t.get("address").getAsString();
 
-        consumer = adapter.newConsumer(String.class, addressFn, t -> t.get("value").getAsString());
+        consumer = adapter.newJsonConsumer(String.class, addressFn, t -> t.get("value").getAsString());
         checkConsumerJson(2, connection, address, consumer, "one", "two", "three");
 
         adapter.close();
diff --git a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcFunctionsTest.java b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcFunctionsTest.java
index ebebb69..6a3bbcd 100644
--- a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcFunctionsTest.java
+++ b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/PlcFunctionsTest.java
@@ -73,7 +73,7 @@ public class PlcFunctionsTest {
   }
 
   /*
-   * test PlcConnectionAdapter.newConsumer(address)
+   * test PlcConnectionAdapter.newJsonConsumer(address)
    */
   @Test
   @Category(FastTests.class)
@@ -107,7 +107,7 @@ public class PlcFunctionsTest {
   }
 
   /*
-   * test PlcConnectionAdapter.newConsumer(addressFn, valueFn)
+   * test PlcConnectionAdapter.newJsonConsumer(addressFn, valueFn)
    */
   @Test
   @Category(FastTests.class)
diff --git a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockConnection.java b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockConnection.java
index 2ddd5c6..ed60240 100644
--- a/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockConnection.java
+++ b/integrations/apache-edgent/src/test/java/org/apache/plc4x/edgent/mock/MockConnection.java
@@ -33,14 +33,6 @@ import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
-import org.apache.plc4x.java.api.messages.items.PlcReadRequestItem;
-import org.apache.plc4x.java.api.messages.items.PlcReadResponseItem;
-import org.apache.plc4x.java.api.messages.items.PlcWriteResponseItem;
-import org.apache.plc4x.java.api.messages.items.PlcWriteRequestItem;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 
@@ -89,7 +81,7 @@ public class MockConnection extends org.apache.plc4x.java.base.connection.MockCo
         List<PlcReadResponseItem<Object>> responseItems = new LinkedList<>();
         for (PlcReadRequestItem<?> reqItem : readRequest.getRequestItems()) {
             @SuppressWarnings("unchecked")
-            PlcReadRequestItem<Object> requestItem = (PlcReadRequestItem<Object>) reqItem;
+            PlcReadRequestItem<Object> requestItem = reqItem;
             PlcReadResponseItem<Object> responseItem = new PlcReadResponseItem<>(requestItem, PlcResponseCode.OK,
                 Collections.singletonList(getDataValue(requestItem.getField())));
             responseItems.add(responseItem);
@@ -117,7 +109,7 @@ public class MockConnection extends org.apache.plc4x.java.base.connection.MockCo
         }
         List<PlcWriteResponseItem<Object>> responseItems = new LinkedList<>();
         for (PlcWriteRequestItem<?> reqItem : writeRequest.getRequestItems()) {
-            PlcWriteRequestItem<Object> requestItem = (PlcWriteRequestItem<Object>) reqItem;
+            PlcWriteRequestItem<Object> requestItem = reqItem;
             setDataValue(requestItem.getField(), requestItem.getValues());
             PlcWriteResponseItem<Object> responseItem = new PlcWriteResponseItem<>(requestItem, PlcResponseCode.OK);
             responseItems.add(responseItem);
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
index df31bdd..1b78f45 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java
@@ -18,7 +18,6 @@ under the License.
 */
 package org.apache.plc4x.java.api.connection;
 
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 
@@ -37,6 +36,6 @@ public interface PlcWriter {
      */
     CompletableFuture<? extends PlcWriteResponse> write(PlcWriteRequest writeRequest);
 
-    PlcReadRequest.Builder writeRequestBuilder();
+    PlcWriteRequest.Builder writeRequestBuilder();
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
index 5bcd731..b9767e4 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcReadResponse.java
@@ -21,6 +21,7 @@ package org.apache.plc4x.java.api.messages;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.Collection;
 
 /**
  * Response to a {@link PlcReadRequest}.
@@ -32,53 +33,66 @@ public interface PlcReadResponse extends PlcFieldResponse<PlcReadRequest> {
     boolean isRaw(String name);
     byte[] getRaw(String name);
     byte[] getRaw(String name, int index);
+    Collection<byte[]> getAllRaws(String name);
 
     boolean isObject(String name);
     Object getObject(String name);
     Object getObject(String name, int index);
+    Collection<Object> getAllObjects(String name);
 
     boolean isBoolean(String name);
     Boolean getBoolean(String name);
     Boolean getBoolean(String name, int index);
+    Collection<Boolean> getAllBooleans(String name);
 
     boolean isByte(String name);
     Byte getByte(String name);
     Byte getByte(String name, int index);
+    Collection<Byte> getAllBytes(String name);
 
     boolean isShort(String name);
     Short getShort(String name);
     Short getShort(String name, int index);
+    Collection<Short> getAllShorts(String name);
 
     boolean isInteger(String name);
     Integer getInteger(String name);
     Integer getInteger(String name, int index);
+    Collection<Integer> getAllIntegers(String name);
 
     boolean isLong(String name);
     Long getLong(String name);
     Long getLong(String name, int index);
+    Collection<Long> getAllLongs(String name);
 
     boolean isFloat(String name);
     Float getFloat(String name);
     Float getFloat(String name, int index);
+    Collection<Float> getAllFloats(String name);
 
     boolean isDouble(String name);
     Double getDouble(String name);
     Double getDouble(String name, int index);
+    Collection<Double> getAllDoubles(String name);
 
     boolean isString(String name);
     String getString(String name);
     String getString(String name, int index);
+    Collection<String> getAllStrings(String name);
 
     boolean isTime(String name);
     LocalTime getTime(String name);
     LocalTime getTime(String name, int index);
+    Collection<LocalTime> getAllTimes(String name);
 
     boolean isDate(String name);
     LocalDate getDate(String name);
     LocalDate getDate(String name, int index);
+    Collection<LocalDate> getAllDates(String name);
 
     boolean isDateTime(String name);
     LocalDateTime getDateTime(String name);
     LocalDateTime getDateTime(String name, int index);
+    Collection<LocalDateTime> getAllDateTimes(String name);
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteRequest.java
index 4017c9d..4d08985 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteRequest.java
@@ -26,31 +26,31 @@ public interface PlcWriteRequest extends PlcFieldRequest {
 
     interface Builder extends PlcMessageBuilder<PlcWriteRequest> {
 
-        /* NOT Quite sure about these two ...
         PlcReadRequest.Builder addItem(String name, String fieldQuery, byte[]... values);
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, Object... values);*/
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, Boolean... values);
+        PlcReadRequest.Builder addItem(String name, String fieldQuery, Object... values);
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, Byte... values);
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, Boolean... values);
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, Short... values);
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, Byte... values);
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, Integer... values);
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, Short... values);
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, Long... values);
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, Integer... values);
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, Float... values);
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, Long... values);
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, Double... values);
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, Float... values);
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, String... values);
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, Double... values);
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, LocalTime... values);
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, String... values);
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, LocalDate... values);
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, LocalTime... values);
 
-        PlcReadRequest.Builder addItem(String name, String fieldQuery, LocalDateTime... values);
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, LocalDate... values);
+
+        PlcWriteRequest.Builder addItem(String name, String fieldQuery, LocalDateTime... values);
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/types/PlcClientDatatype.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/types/PlcClientDatatype.java
new file mode 100644
index 0000000..0c66bc0
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/types/PlcClientDatatype.java
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.api.types;
+
+public enum PlcClientDatatype {
+
+    RAW,
+    OBJECT,
+    BOOLEAN,
+    BYTE,
+    SHORT,
+    INTEGER,
+    LONG,
+    FLOAT,
+    DOUBLE,
+    STRING,
+    TIME,
+    DATE,
+    DATE_TIME
+
+}