You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@plc4x.apache.org by GitBox <gi...@apache.org> on 2018/09/11 15:23:10 UTC

[GitHub] asfgit closed pull request #19: Add URL Field to Key Schema in Kafka Source Connector

asfgit closed pull request #19: Add URL Field to Key Schema in Kafka Source Connector
URL: https://github.com/apache/incubator-plc4x/pull/19
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index c354a1ea2..7d0ed8621 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -20,6 +20,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -32,10 +34,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.*;
 
 /**
@@ -48,6 +47,15 @@ Licensed to the Apache Software Foundation (ASF) under one
     private final static long WAIT_LIMIT_MILLIS = 100;
     private final static long TIMEOUT_LIMIT_MILLIS = 5000;
 
+    private final static String URL_FIELD = "url";
+    private final static String QUERY_FIELD = "query";
+
+    private final static Schema KEY_SCHEMA =
+        new SchemaBuilder(Schema.Type.STRUCT)
+            .field(URL_FIELD, Schema.STRING_SCHEMA)
+            .field(QUERY_FIELD, Schema.STRING_SCHEMA)
+            .build();
+
     private String topic;
     private String url;
     private List<String> queries;
@@ -56,6 +64,8 @@ Licensed to the Apache Software Foundation (ASF) under one
     private PlcReader plcReader;
     private PlcReadRequest plcRequest;
 
+
+
     // TODO: should we use shared (static) thread pool for this?
     private ScheduledExecutorService scheduler;
     private ScheduledFuture<?> timer;
@@ -166,11 +176,16 @@ private synchronized boolean awaitFetch(long milliseconds) throws InterruptedExc
                 continue;
             }
 
-            Object rawValue = response.getObject(query);
-            Schema valueSchema = getSchema(rawValue.getClass());
-            Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+            Struct key = new Struct(KEY_SCHEMA)
+                .put(URL_FIELD, url)
+                .put(QUERY_FIELD, query);
+
+            Object value = response.getObject(query);
+            Schema valueSchema = getSchema(value);
             Long timestamp = System.currentTimeMillis();
-            Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+            Map<String, String> sourcePartition = new HashMap<>();
+            sourcePartition.put("url", url);
+            sourcePartition.put("query", query);
             Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
 
             SourceRecord record =
@@ -178,8 +193,8 @@ private synchronized boolean awaitFetch(long milliseconds) throws InterruptedExc
                     sourcePartition,
                     sourceOffset,
                     topic,
-                    Schema.STRING_SCHEMA,
-                    query,
+                    KEY_SCHEMA,
+                    key,
                     valueSchema,
                     value
                 );
@@ -190,20 +205,38 @@ private synchronized boolean awaitFetch(long milliseconds) throws InterruptedExc
         return result;
     }
 
-    private Schema getSchema(Class<?> type) {
-        if (type.equals(Byte.class))
+    private Schema getSchema(Object value) {
+        Objects.requireNonNull(value);
+
+        if (value instanceof Byte)
             return Schema.INT8_SCHEMA;
 
-        if (type.equals(Short.class))
+        if (value instanceof Short)
             return Schema.INT16_SCHEMA;
 
-        if (type.equals(Integer.class))
+        if (value instanceof Integer)
             return Schema.INT32_SCHEMA;
 
-        if (type.equals(Long.class))
+        if (value instanceof Long)
             return Schema.INT64_SCHEMA;
 
-        return Schema.STRING_SCHEMA; // default case; invoke .toString on value
+        if (value instanceof Float)
+            return Schema.FLOAT32_SCHEMA;
+
+        if (value instanceof Double)
+            return Schema.FLOAT64_SCHEMA;
+
+        if (value instanceof Boolean)
+            return Schema.BOOLEAN_SCHEMA;
+
+        if (value instanceof String)
+            return Schema.STRING_SCHEMA;
+
+        if (value instanceof byte[])
+            return Schema.BYTES_SCHEMA;
+
+        // TODO: add support for collective and complex types
+        throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName()));
     }
 
 }
\ No newline at end of file
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
index 3966deb8a..b93e545e4 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -69,22 +69,45 @@ void set(TestField field, FieldItem value) {
     private FieldItem randomValue(Class<?> type) {
         Object result = null;
 
-        // TODO: implement for further data types
+        if (type.equals(Byte.class))
+            result = (byte) random.nextInt(1 << 8);
 
-        if (type == Integer.class)
+        if (type.equals(Short.class))
+            result = (short) random.nextInt(1 << 16);
+
+        if (type.equals(Integer.class))
             result = random.nextInt();
 
-        if (type == Byte.class) {
-            byte[] bytes = new byte[1];
-            random.nextBytes(bytes);
-            result = bytes[0];
+        if (type.equals(Long.class))
+            result = random.nextLong();
+
+        if (type.equals(Float.class))
+            result = random.nextFloat();
+
+        if (type.equals(Double.class))
+            result = random.nextDouble();
+
+        if (type.equals(Boolean.class))
+            result = random.nextBoolean();
+
+        if (type.equals(String.class)) {
+            int length = random.nextInt(100);
+            StringBuilder sb = new StringBuilder(length);
+            for (int i = 0; i < length; i++) {
+                char c = (char)('a' + random.nextInt(26));
+                sb.append(c);
+            }
+            result = sb.toString();
         }
 
-        if (type == Short.class) {
-            result = random.nextInt(1 << 16);
+        if (type.equals(byte[].class)) {
+            int length = random.nextInt(100);
+            byte[] bytes = new byte[length];
+            random.nextBytes(bytes);
+            result = bytes;
         }
 
-        return new TestFieldItem(new Object[]{result});
+        return new TestFieldItem(new Object[] { result });
     }
 
     @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services