You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/17 07:07:50 UTC
[incubator-plc4x] 06/29: added url to source key schema
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit fb5513807eec8658eaed12fdf4419ccc45d59c9c
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Tue Sep 11 15:57:04 2018 +0200
added url to source key schema
---
.../org/apache/plc4x/kafka/Plc4xSourceTask.java | 65 ++++++++++++++++------
.../org/apache/plc4x/java/test/TestDevice.java | 42 +++++++++++---
2 files changed, 82 insertions(+), 25 deletions(-)
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index c354a1e..7d0ed86 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -20,6 +20,8 @@ package org.apache.plc4x.kafka;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -32,10 +34,7 @@ import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.kafka.util.VersionUtil;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.*;
/**
@@ -48,6 +47,15 @@ public class Plc4xSourceTask extends SourceTask {
private final static long WAIT_LIMIT_MILLIS = 100;
private final static long TIMEOUT_LIMIT_MILLIS = 5000;
+ private final static String URL_FIELD = "url";
+ private final static String QUERY_FIELD = "query";
+
+ private final static Schema KEY_SCHEMA =
+ new SchemaBuilder(Schema.Type.STRUCT)
+ .field(URL_FIELD, Schema.STRING_SCHEMA)
+ .field(QUERY_FIELD, Schema.STRING_SCHEMA)
+ .build();
+
private String topic;
private String url;
private List<String> queries;
@@ -56,6 +64,8 @@ public class Plc4xSourceTask extends SourceTask {
private PlcReader plcReader;
private PlcReadRequest plcRequest;
+
+
// TODO: should we use shared (static) thread pool for this?
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> timer;
@@ -166,11 +176,16 @@ public class Plc4xSourceTask extends SourceTask {
continue;
}
- Object rawValue = response.getObject(query);
- Schema valueSchema = getSchema(rawValue.getClass());
- Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+ Struct key = new Struct(KEY_SCHEMA)
+ .put(URL_FIELD, url)
+ .put(QUERY_FIELD, query);
+
+ Object value = response.getObject(query);
+ Schema valueSchema = getSchema(value);
Long timestamp = System.currentTimeMillis();
- Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+ Map<String, String> sourcePartition = new HashMap<>();
+ sourcePartition.put("url", url);
+ sourcePartition.put("query", query);
Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
SourceRecord record =
@@ -178,8 +193,8 @@ public class Plc4xSourceTask extends SourceTask {
sourcePartition,
sourceOffset,
topic,
- Schema.STRING_SCHEMA,
- query,
+ KEY_SCHEMA,
+ key,
valueSchema,
value
);
@@ -190,20 +205,38 @@ public class Plc4xSourceTask extends SourceTask {
return result;
}
- private Schema getSchema(Class<?> type) {
- if (type.equals(Byte.class))
+ private Schema getSchema(Object value) {
+ Objects.requireNonNull(value);
+
+ if (value instanceof Byte)
return Schema.INT8_SCHEMA;
- if (type.equals(Short.class))
+ if (value instanceof Short)
return Schema.INT16_SCHEMA;
- if (type.equals(Integer.class))
+ if (value instanceof Integer)
return Schema.INT32_SCHEMA;
- if (type.equals(Long.class))
+ if (value instanceof Long)
return Schema.INT64_SCHEMA;
- return Schema.STRING_SCHEMA; // default case; invoke .toString on value
+ if (value instanceof Float)
+ return Schema.FLOAT32_SCHEMA;
+
+ if (value instanceof Double)
+ return Schema.FLOAT64_SCHEMA;
+
+ if (value instanceof Boolean)
+ return Schema.BOOLEAN_SCHEMA;
+
+ if (value instanceof String)
+ return Schema.STRING_SCHEMA;
+
+ if (value instanceof byte[])
+ return Schema.BYTES_SCHEMA;
+
+ // TODO: add support for collective and complex types
+ throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName()));
}
}
\ No newline at end of file
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
index 3966deb..3fcfc1d 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.test;
import org.apache.plc4x.java.base.messages.items.FieldItem;
+import java.lang.reflect.Array;
import java.util.*;
/**
@@ -69,22 +70,45 @@ class TestDevice {
private FieldItem randomValue(Class<?> type) {
Object result = null;
- // TODO: implement for further data types
+ if (type.equals(Byte.class))
+ result = (byte) random.nextInt(1 << 8);
- if (type == Integer.class)
+ if (type.equals(Short.class))
+ result = (short) random.nextInt(1 << 16);
+
+ if (type.equals(Integer.class))
result = random.nextInt();
- if (type == Byte.class) {
- byte[] bytes = new byte[1];
- random.nextBytes(bytes);
- result = bytes[0];
+ if (type.equals(Long.class))
+ result = random.nextLong();
+
+ if (type.equals(Float.class))
+ result = random.nextFloat();
+
+ if (type.equals(Double.class))
+ result = random.nextDouble();
+
+ if (type.equals(Boolean.class))
+ result = random.nextBoolean();
+
+ if (type.equals(String.class)) {
+ int length = random.nextInt(100);
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ char c = (char)('a' + random.nextInt(26));
+ sb.append(c);
+ }
+ result = sb.toString();
}
- if (type == Short.class) {
- result = random.nextInt(1 << 16);
+ if (type.equals(byte[].class)) {
+ int length = random.nextInt(100);
+ byte[] bytes = new byte[length];
+ random.nextBytes(bytes);
+ result = bytes;
}
- return new TestFieldItem(new Object[]{result});
+ return new TestFieldItem(new Object[] { result });
}
@Override