You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@plc4x.apache.org by GitBox <gi...@apache.org> on 2018/09/06 21:14:55 UTC

[GitHub] asfgit closed pull request #15: Quick and dirty implementation of Apache Kafka Source Connector

asfgit closed pull request #15: Quick and dirty implementation of Apache Kafka Source Connector
URL: https://github.com/apache/incubator-plc4x/pull/15
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index f20a8a57a..4de195b44 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -27,7 +27,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -44,10 +44,10 @@ Licensed to the Apache Software Foundation (ASF) under one
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         log.info("Setting task configurations for {} workers.", maxTasks);
-        final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
-        for (int i = 0; i < maxTasks; ++i) {
-            configs.add(configProperties);
-        }
+
+        // Only one task will be created; ignoring maxTasks for now
+        final List<Map<String, String>> configs = Collections.singletonList(configProperties);
+
         return configs;
     }
 
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
index bceedfc6e..b906fab82 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
@@ -33,6 +33,15 @@ Licensed to the Apache Software Foundation (ASF) under one
     public static final String PLC_CONNECTION_STRING_DISPLAY = "PLC Connection String";
     public static final String PLC_CONNECTION_STRING_DOC = "Connection string used by PLC4X to connect to the PLC.";
 
+    public static final String PLC_TOPIC = "topic";
+    public static final String PLC_TOPIC_DOC = "Kafka topic to publish messages to.";
+
+    public static final String PLC_DATATYPE_CONFIG = "type";
+    public static final String PLC_DATATYPE_DOC = "Data type of values sent or received by PLC.";
+
+    public static final String PLC_ADDRESS = "address";
+    public static final String PLC_ADDRESS_DOC = "PLC address to sent to or receive data from.";
+
     public static ConfigDef baseConfigDef() {
         ConfigDef config = new ConfigDef();
         addPlcOptions(config);
@@ -44,7 +53,22 @@ private static final void addPlcOptions(ConfigDef config) {
             PLC_CONNECTION_STRING_CONFIG,
             Type.STRING,
             Importance.HIGH,
-            PLC_CONNECTION_STRING_DOC);
+            PLC_CONNECTION_STRING_DOC)
+        .define(
+            PLC_DATATYPE_CONFIG,
+            Type.CLASS,
+            Importance.HIGH,
+            PLC_DATATYPE_DOC)
+        .define(
+            PLC_TOPIC,
+            Type.STRING,
+            Importance.HIGH,
+            PLC_TOPIC_DOC)
+        .define(
+            PLC_ADDRESS,
+            Type.STRING,
+            Importance.HIGH,
+            PLC_ADDRESS_DOC);
     }
 
     public static final ConfigDef CONFIG_DEF = baseConfigDef();
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
index 36e37a04e..040df2f7a 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -19,8 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.plc4x.kafka.source;
 
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -28,6 +27,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcInvalidAddressException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
@@ -36,11 +36,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -53,6 +49,11 @@ Licensed to the Apache Software Foundation (ASF) under one
     private PlcReader reader;
     private PlcReadRequest readRequest;
     private AtomicBoolean running = new AtomicBoolean(false);
+    private String topic;
+    private Schema keySchema = Schema.STRING_SCHEMA;
+    private Schema valueSchema;
+    private long offset = 0;
+    private final Map<Class<?>, Schema> typeSchemas = initTypeSchemas();
 
     @Override
     public String version() {
@@ -64,7 +65,7 @@ public void start(Map<String, String> properties) {
         try {
             config = new Plc4xSourceConfig(properties);
         } catch (ConfigException e) {
-            throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
+            throw new ConnectException("Couldn't start Plc4xSourceTask due to configuration error", e);
         }
         final String url = config.getString(Plc4xSourceConfig.PLC_CONNECTION_STRING_CONFIG);
 
@@ -75,12 +76,28 @@ public void start(Map<String, String> properties) {
                 throw new ConnectException("PlcReader not available for this type of connection");
             }
             reader = readerOptional.get();
+            Class<?> dataType = config.getClass(Plc4xSourceConfig.PLC_DATATYPE_CONFIG);
+            String addressString = config.getString(Plc4xSourceConfig.PLC_ADDRESS);
+            Address address = plcConnection.parseAddress(addressString);
+            readRequest = new PlcReadRequest(dataType, address);
+            topic = config.getString(Plc4xSourceConfig.PLC_TOPIC);
+            valueSchema = typeSchemas.get(dataType);
             running.set(true);
         } catch (PlcConnectionException e) {
             throw new ConnectException("Caught exception while connecting to PLC", e);
+        } catch (PlcInvalidAddressException e) {
+            throw new ConnectException("Invalid PLC address", e);
         }
     }
 
+    private Map<Class<?>, Schema> initTypeSchemas() {
+        Map<Class<?>, Schema> map = new HashMap<>();
+        map.put(Boolean.class, Schema.BOOLEAN_SCHEMA);
+        map.put(Integer.class, Schema.INT32_SCHEMA);
+        // TODO add other
+        return map;
+    }
+
     @Override
     public void stop() {
         if(plcConnection != null) {
@@ -103,10 +120,13 @@ public void stop() {
 
                 for (ReadResponseItem<?> responseItem : plcReadResponse.getResponseItems()) {
                     Address address = responseItem.getRequestItem().getAddress();
-                    List<?> values = responseItem.getValues();
-
-                    // TODO: Implement Sending this information to Kafka ...
-                    //results.add(new SourceRecord())
+                    for (Object value : responseItem.getValues()) {
+                        Map<String, String> sourcePartition = Collections.singletonMap("address", address.toString());
+                        Map<String, Long> sourceOffset = Collections.singletonMap("offset", offset);
+                        SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, keySchema, address.toString(), valueSchema, value);
+                        results.add(record);
+                        offset++; // TODO: figure out how to track offsets
+                    }
                 }
             } catch (ExecutionException e) {
                 log.error("Error reading values from PLC", e);
diff --git a/plc4j/protocols/pom.xml b/plc4j/protocols/pom.xml
index 0fca4a1ea..2565f43ed 100644
--- a/plc4j/protocols/pom.xml
+++ b/plc4j/protocols/pom.xml
@@ -41,6 +41,7 @@
     <module>ethernetip</module>
     <module>modbus</module>
     <module>s7</module>
+    <module>test</module>
 
     <module>benchmarks</module>
   </modules>
diff --git a/plc4j/protocols/test/pom.xml b/plc4j/protocols/test/pom.xml
new file mode 100644
index 000000000..0eaaa0cce
--- /dev/null
+++ b/plc4j/protocols/test/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.plc4x</groupId>
+    <artifactId>plc4j-protocols</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>plc4j-protocol-test</artifactId>
+  <name>PLC4J: Protocol: TEST</name>
+  <description>Test implementation of a PLC4X driver.</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-api</artifactId>
+      <version>0.0.1-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestAddress.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestAddress.java
new file mode 100644
index 000000000..549b7f49d
--- /dev/null
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestAddress.java
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.test;
+
+import org.apache.plc4x.java.api.model.Address;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Test address for accessing values in virtual devices.
+ *
+ */
+class TestAddress implements Address {
+    private static final Pattern ADDRESS_PATTERN = Pattern.compile("^\\w+$");
+
+    private final String value;
+
+    public static final TestAddress RANDOM = new TestAddress("random");
+
+    private TestAddress(String value) {
+        this.value = value;
+    }
+
+    public static boolean isValid(String addressString) {
+        Matcher matcher = ADDRESS_PATTERN.matcher(addressString);
+        return matcher.matches();
+    }
+
+    public static TestAddress of(String addressString) {
+        return new TestAddress(addressString);
+    }
+
+    @Override
+    public int hashCode() {
+        return value.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (!(o instanceof TestAddress))
+            return false;
+
+        TestAddress that = (TestAddress) o;
+
+        return this.value.equals(that.value);
+    }
+
+    @Override
+    public String toString() {
+        return value;
+    }
+}
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
new file mode 100644
index 000000000..47a11e7f9
--- /dev/null
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java
@@ -0,0 +1,149 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.test;
+
+import org.apache.plc4x.java.api.connection.*;
+import org.apache.plc4x.java.api.exceptions.PlcInvalidAddressException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.messages.items.ReadRequestItem;
+import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
+import org.apache.plc4x.java.api.messages.items.WriteRequestItem;
+import org.apache.plc4x.java.api.messages.items.WriteResponseItem;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.types.ResponseCode;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Connection to a test device.
+ * This class is not thread-safe.
+ */
+class TestConnection implements PlcConnection, PlcReader, PlcWriter {
+    private final TestDevice device;
+    private boolean connected = false;
+
+    TestConnection(TestDevice device) {
+        this.device = device;
+    }
+
+    @Override
+    public void connect() {
+        connected = true;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return connected;
+    }
+
+    @Override
+    public void close() {
+        connected = false;
+    }
+
+    @Override
+    public Address parseAddress(String addressString) throws PlcInvalidAddressException {
+        if (!TestAddress.isValid(addressString)) {
+            throw new PlcInvalidAddressException("Address must contain a single word.");
+        }
+
+        return TestAddress.of(addressString);
+    }
+
+    @Override
+    public Optional<PlcLister> getLister() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcReader> getReader() {
+        return Optional.of(this);
+    }
+
+    @Override
+    public Optional<PlcWriter> getWriter() {
+        return Optional.of(this);
+    }
+
+    @Override
+    public Optional<PlcSubscriber> getSubscriber() {
+        return Optional.empty(); // TODO: implement this
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<? extends PlcReadResponse> read(PlcReadRequest readRequest) {
+        List<ReadResponseItem<?>> responseItems = new LinkedList<>();
+        for (ReadRequestItem<?> requestItem : readRequest.getRequestItems()) {
+            TestAddress address = (TestAddress) requestItem.getAddress();
+            Optional<?> value = device.get(requestItem.getDatatype(), address);
+            ReadResponseItem<?> responseItem;
+            if (value.isPresent()) {
+                responseItem = new ReadResponseItem(requestItem, ResponseCode.OK, Collections.singletonList(value.get()));
+            } else {
+                responseItem = new ReadResponseItem(requestItem, ResponseCode.NOT_FOUND, Collections.emptyList());
+            }
+            responseItems.add(responseItem);
+        }
+        PlcReadResponse response = new PlcReadResponse(readRequest, responseItems);
+        return CompletableFuture.completedFuture(response);
+    }
+
+    @Override
+    public <T> CompletableFuture<TypeSafePlcReadResponse<T>> read(TypeSafePlcReadRequest<T> readRequest) {
+        return null; // TODO: implement this
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<? extends PlcWriteResponse> write(PlcWriteRequest writeRequest) {
+        List<WriteResponseItem<?>> responseItems = new LinkedList<>();
+        for (WriteRequestItem<?> requestItem : writeRequest.getRequestItems()) {
+            TestAddress address = (TestAddress) requestItem.getAddress();
+            Object value = requestItem.getValues().get(0);
+            device.set(address, value);
+            WriteResponseItem<?> responseItem = new WriteResponseItem(requestItem, ResponseCode.OK);
+            responseItems.add(responseItem);
+        }
+        PlcWriteResponse response = new PlcWriteResponse(writeRequest, responseItems);
+        return CompletableFuture.completedFuture(response);
+    }
+
+    @Override
+    public <T> CompletableFuture<TypeSafePlcWriteResponse<T>> write(TypeSafePlcWriteRequest<T> writeRequest) {
+        return null; // TODO: implement this
+    }
+
+    @Override
+    public String toString() {
+        return String.format("test:%s", device);
+    }
+
+}
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
new file mode 100644
index 000000000..59cf88391
--- /dev/null
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.test;
+
+import java.util.*;
+
+/**
+ * Test device storing its state in memory.
+ * Values are stored in a HashMap.
+ */
+class TestDevice {
+    private final Random random = new Random();
+    private final String name;
+    private final Map<TestAddress, Object> state = new HashMap<>();
+
+    TestDevice(String name) {
+        this.name = name;
+    }
+
+    @SuppressWarnings("unchecked")
+    <T> Optional<T> get(Class<? extends T> type, TestAddress address) {
+        Objects.requireNonNull(address);
+        if (address.equals(TestAddress.RANDOM)) {
+            return Optional.of(randomValue(type));
+        } else {
+            return Optional.ofNullable((T) state.get(address));
+        }
+    }
+
+    public void set(TestAddress address, Object value) {
+        state.put(address, value);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T randomValue(Class<T> type) {
+        Object result = null;
+
+        // TODO: implement for further data types
+
+        if (type == Integer.class)
+            result = random.nextInt();
+
+        if (type == Byte.class) {
+            byte[] bytes = new byte[1];
+            random.nextBytes(bytes);
+            result = bytes[0];
+        }
+
+        if (type == Short.class) {
+            result = random.nextInt(1 << 16);
+        }
+
+        return (T) result;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestPlcDriver.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestPlcDriver.java
new file mode 100644
index 000000000..d4f2138b4
--- /dev/null
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestPlcDriver.java
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.test;
+
+import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.api.authentication.PlcAuthentication;
+import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+
+/**
+ * Test driver holding its state in the client process.
+ * The URL schema is {@code test:<device_name>}.
+ * Devices are created each time a connection is established and should not be reused.
+ * Every device contains a random value generator accessible by address {@code random}.
+ * Any value can be stored into test devices, however the state will be gone when connection is closed.
+ */
+public class TestPlcDriver implements PlcDriver {
+
+    @Override
+    public String getProtocolCode() {
+        return "test";
+    }
+
+    @Override
+    public String getProtocolName() {
+        return "PLC4X Test Protocol";
+    }
+
+    @Override
+    public PlcConnection connect(String url) throws PlcConnectionException {
+        // TODO: perform further checks
+        String deviceName = url.substring(5);
+        if (deviceName.isEmpty()) {
+            throw new PlcConnectionException("Invalid URL: no device name given.");
+        }
+        TestDevice device = new TestDevice(deviceName);
+        return new TestConnection(device);
+    }
+
+    @Override
+    public PlcConnection connect(String url, PlcAuthentication authentication) throws PlcConnectionException {
+        throw new PlcConnectionException("Test driver does not support authentication.");
+    }
+
+}
diff --git a/plc4j/protocols/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/plc4j/protocols/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
new file mode 100644
index 000000000..f71d4366e
--- /dev/null
+++ b/plc4j/protocols/test/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.plc4x.java.test.TestPlcDriver
diff --git a/plc4j/protocols/test/src/test/java/org/apache/plc4x/java/test/TestDeviceTest.java b/plc4j/protocols/test/src/test/java/org/apache/plc4x/java/test/TestDeviceTest.java
new file mode 100644
index 000000000..9af6a0a6d
--- /dev/null
+++ b/plc4j/protocols/test/src/test/java/org/apache/plc4x/java/test/TestDeviceTest.java
@@ -0,0 +1,53 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.test;
+
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestDeviceTest {
+
+    @Test
+    public void random() {
+        TestDevice device = new TestDevice("foobar");
+        TestAddress address = TestAddress.RANDOM;
+
+        Optional<Integer> value = device.get(Integer.class, address);
+
+        assertTrue(value.isPresent());
+    }
+
+    @Test
+    public void read() {
+        TestDevice device = new TestDevice("foobar");
+        TestAddress address = TestAddress.of("banana");
+
+        Optional<Integer> value = device.get(Integer.class, address);
+        assertFalse(value.isPresent());
+
+        device.set(address, 42);
+        value = device.get(Integer.class, address);
+        assertTrue(value.isPresent());
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services