You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/17 07:07:50 UTC

[incubator-plc4x] 06/29: added url to source key schema

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit fb5513807eec8658eaed12fdf4419ccc45d59c9c
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Tue Sep 11 15:57:04 2018 +0200

    added url to source key schema
---
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    | 65 ++++++++++++++++------
 .../org/apache/plc4x/java/test/TestDevice.java     | 42 +++++++++++---
 2 files changed, 82 insertions(+), 25 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index c354a1e..7d0ed86 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -20,6 +20,8 @@ package org.apache.plc4x.kafka;
 
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -32,10 +34,7 @@ import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.*;
 
 /**
@@ -48,6 +47,15 @@ public class Plc4xSourceTask extends SourceTask {
     private final static long WAIT_LIMIT_MILLIS = 100;
     private final static long TIMEOUT_LIMIT_MILLIS = 5000;
 
+    private final static String URL_FIELD = "url";
+    private final static String QUERY_FIELD = "query";
+
+    private final static Schema KEY_SCHEMA =
+        new SchemaBuilder(Schema.Type.STRUCT)
+            .field(URL_FIELD, Schema.STRING_SCHEMA)
+            .field(QUERY_FIELD, Schema.STRING_SCHEMA)
+            .build();
+
     private String topic;
     private String url;
     private List<String> queries;
@@ -56,6 +64,8 @@ public class Plc4xSourceTask extends SourceTask {
     private PlcReader plcReader;
     private PlcReadRequest plcRequest;
 
+
+
     // TODO: should we use shared (static) thread pool for this?
     private ScheduledExecutorService scheduler;
     private ScheduledFuture<?> timer;
@@ -166,11 +176,16 @@ public class Plc4xSourceTask extends SourceTask {
                 continue;
             }
 
-            Object rawValue = response.getObject(query);
-            Schema valueSchema = getSchema(rawValue.getClass());
-            Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+            Struct key = new Struct(KEY_SCHEMA)
+                .put(URL_FIELD, url)
+                .put(QUERY_FIELD, query);
+
+            Object value = response.getObject(query);
+            Schema valueSchema = getSchema(value);
             Long timestamp = System.currentTimeMillis();
-            Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+            Map<String, String> sourcePartition = new HashMap<>();
+            sourcePartition.put("url", url);
+            sourcePartition.put("query", query);
             Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
 
             SourceRecord record =
@@ -178,8 +193,8 @@ public class Plc4xSourceTask extends SourceTask {
                     sourcePartition,
                     sourceOffset,
                     topic,
-                    Schema.STRING_SCHEMA,
-                    query,
+                    KEY_SCHEMA,
+                    key,
                     valueSchema,
                     value
                 );
@@ -190,20 +205,38 @@ public class Plc4xSourceTask extends SourceTask {
         return result;
     }
 
-    private Schema getSchema(Class<?> type) {
-        if (type.equals(Byte.class))
+    private Schema getSchema(Object value) {
+        Objects.requireNonNull(value);
+
+        if (value instanceof Byte)
             return Schema.INT8_SCHEMA;
 
-        if (type.equals(Short.class))
+        if (value instanceof Short)
             return Schema.INT16_SCHEMA;
 
-        if (type.equals(Integer.class))
+        if (value instanceof Integer)
             return Schema.INT32_SCHEMA;
 
-        if (type.equals(Long.class))
+        if (value instanceof Long)
             return Schema.INT64_SCHEMA;
 
-        return Schema.STRING_SCHEMA; // default case; invoke .toString on value
+        if (value instanceof Float)
+            return Schema.FLOAT32_SCHEMA;
+
+        if (value instanceof Double)
+            return Schema.FLOAT64_SCHEMA;
+
+        if (value instanceof Boolean)
+            return Schema.BOOLEAN_SCHEMA;
+
+        if (value instanceof String)
+            return Schema.STRING_SCHEMA;
+
+        if (value instanceof byte[])
+            return Schema.BYTES_SCHEMA;
+
+        // TODO: add support for collective and complex types
+        throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName()));
     }
 
 }
\ No newline at end of file
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
index 3966deb..3fcfc1d 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.test;
 
 import org.apache.plc4x.java.base.messages.items.FieldItem;
 
+import java.lang.reflect.Array;
 import java.util.*;
 
 /**
@@ -69,22 +70,45 @@ class TestDevice {
     private FieldItem randomValue(Class<?> type) {
         Object result = null;
 
-        // TODO: implement for further data types
+        if (type.equals(Byte.class))
+            result = (byte) random.nextInt(1 << 8);
 
-        if (type == Integer.class)
+        if (type.equals(Short.class))
+            result = (short) random.nextInt(1 << 16);
+
+        if (type.equals(Integer.class))
             result = random.nextInt();
 
-        if (type == Byte.class) {
-            byte[] bytes = new byte[1];
-            random.nextBytes(bytes);
-            result = bytes[0];
+        if (type.equals(Long.class))
+            result = random.nextLong();
+
+        if (type.equals(Float.class))
+            result = random.nextFloat();
+
+        if (type.equals(Double.class))
+            result = random.nextDouble();
+
+        if (type.equals(Boolean.class))
+            result = random.nextBoolean();
+
+        if (type.equals(String.class)) {
+            int length = random.nextInt(100);
+            StringBuilder sb = new StringBuilder(length);
+            for (int i = 0; i < length; i++) {
+                char c = (char)('a' + random.nextInt(26));
+                sb.append(c);
+            }
+            result = sb.toString();
         }
 
-        if (type == Short.class) {
-            result = random.nextInt(1 << 16);
+        if (type.equals(byte[].class)) {
+            int length = random.nextInt(100);
+            byte[] bytes = new byte[length];
+            random.nextBytes(bytes);
+            result = bytes;
         }
 
-        return new TestFieldItem(new Object[]{result});
+        return new TestFieldItem(new Object[] { result });
     }
 
     @Override