You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/24 17:37:06 UTC
[plc4x] 02/02: - Finished a first fully operational version of the
Kafka Connect Source
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 74c892cda34d2ac9d1f1b7eb94b2d55bfba60c6e
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Sat Aug 24 19:36:58 2019 +0200
- Finished a first fully operational version of the Kafka Connect Source
---
.../org/apache/plc4x/kafka/Plc4xSourceTask.java | 175 ++++++++++++---------
1 file changed, 101 insertions(+), 74 deletions(-)
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 87ede4e..81b0d62 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.scraper.ResultHandler;
import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
@@ -40,6 +39,10 @@ import org.apache.plc4x.kafka.util.VersionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.*;
import java.util.concurrent.*;
@@ -76,21 +79,13 @@ public class Plc4xSourceTask extends SourceTask {
*/
private static final String SOURCE_NAME_FIELD = "source-name";
private static final String JOB_NAME_FIELD = "job-name";
- private static final String FIELD_NAME_FIELD = "field-name";
private static final Schema KEY_SCHEMA =
new SchemaBuilder(Schema.Type.STRUCT)
.field(SOURCE_NAME_FIELD, Schema.STRING_SCHEMA)
.field(JOB_NAME_FIELD, Schema.STRING_SCHEMA)
- .field(FIELD_NAME_FIELD, Schema.STRING_SCHEMA)
.build();
- // Internal properties.
- private Map<String, String> topics;
- private PlcDriverManager plcDriverManager;
- private TriggerCollector triggerCollector;
- private TriggeredScraperImpl scraper;
-
// Internal buffer into which all incoming scraper responses are written to.
private ArrayBlockingQueue<SourceRecord> buffer;
@@ -104,7 +99,7 @@ public class Plc4xSourceTask extends SourceTask {
AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
String connectionName = config.getString(CONNECTION_NAME_CONFIG);
String plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
- topics = new HashMap<>();
+ Map<String, String> topics = new HashMap<>();
// Create a buffer with a capacity of 1000 elements which schedules access in a fair way.
buffer = new ArrayBlockingQueue<>(1000, true);
@@ -145,51 +140,58 @@ public class Plc4xSourceTask extends SourceTask {
ScraperConfigurationTriggeredImpl scraperConfig = builder.build();
try {
- plcDriverManager = new PooledPlcDriverManager();
- triggerCollector = new TriggerCollectorImpl(plcDriverManager);
- scraper = new TriggeredScraperImpl(scraperConfig, new ResultHandler() {
- @Override
- public void handle(String jobName, String sourceName, Map<String, Object> results) {
- Long timestamp = System.currentTimeMillis();
-
- Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put("sourceName", sourceName);
- sourcePartition.put("jobName", jobName);
-
- Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
- String topic = topics.get(jobName);
-
- for (Map.Entry<String, Object> result : results.entrySet()) {
- // Get field-name and -value from the results.
- String fieldName = result.getKey();
- Object fieldValue = result.getValue();
-
- // Prepare the key structure.
- Struct key = new Struct(KEY_SCHEMA)
- .put(SOURCE_NAME_FIELD, sourceName)
- .put(JOB_NAME_FIELD, jobName)
- .put(FIELD_NAME_FIELD, fieldName);
-
- // Get the schema for the given value type.
- Schema valueSchema = getSchema(fieldValue);
-
- // Prepare the source-record element.
- SourceRecord record =
- new SourceRecord(
- sourcePartition,
- sourceOffset,
- topic,
- KEY_SCHEMA,
- key,
- valueSchema,
- fieldValue
- );
-
- // Add the new source-record to the buffer.
- buffer.add(record);
- }
+ PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
+ TriggerCollector triggerCollector = new TriggerCollectorImpl(plcDriverManager);
+ TriggeredScraperImpl scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> {
+ Long timestamp = System.currentTimeMillis();
+
+ Map<String, String> sourcePartition = new HashMap<>();
+ sourcePartition.put("sourceName", sourceName);
+ sourcePartition.put("jobName", jobName);
+
+ Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+ String topic = topics.get(jobName);
+
+ // Prepare the key structure.
+ Struct key = new Struct(KEY_SCHEMA)
+ .put(SOURCE_NAME_FIELD, sourceName)
+ .put(JOB_NAME_FIELD, jobName);
+
+ // Build the Schema for the result struct.
+ SchemaBuilder recordSchemaBuilder = SchemaBuilder.struct().name("org.apache.plc4x.kafka.JobResult");
+ for (Map.Entry<String, Object> result : results.entrySet()) {
+ // Get field-name and -value from the results.
+ String fieldName = result.getKey();
+ Object fieldValue = result.getValue();
+
+ // Get the schema for the given value type.
+ Schema valueSchema = getSchema(fieldValue);
+
+ // Add the schema description for the current field.
+ recordSchemaBuilder.field(fieldName, valueSchema);
+ }
+ Schema recordSchema = recordSchemaBuilder.build();
+
+ // Build the struct itself.
+ Struct recordStruct = new Struct(recordSchema);
+ for (Map.Entry<String, Object> result : results.entrySet()) {
+ // Get field-name and -value from the results.
+ String fieldName = result.getKey();
+ Object fieldValue = result.getValue();
+ recordStruct.put(fieldName, fieldValue);
}
+
+ // Prepare the source-record element.
+ SourceRecord record = new SourceRecord(
+ sourcePartition, sourceOffset,
+ topic,
+ KEY_SCHEMA, key,
+ recordSchema, recordStruct
+ );
+
+ // Add the new source-record to the buffer.
+ buffer.add(record);
}, triggerCollector);
scraper.start();
triggerCollector.start();
@@ -221,33 +223,58 @@ public class Plc4xSourceTask extends SourceTask {
private Schema getSchema(Object value) {
Objects.requireNonNull(value);
- if (value instanceof Byte)
- return Schema.INT8_SCHEMA;
-
- if (value instanceof Short)
- return Schema.INT16_SCHEMA;
-
- if (value instanceof Integer)
- return Schema.INT32_SCHEMA;
+ if(value instanceof List) {
+ List list = (List) value;
+ if(list.isEmpty()) {
+ throw new ConnectException("Unsupported empty lists.");
+ }
+ // In PLC4X list elements all contain the same type.
+ Object firstElement = list.get(0);
+ Schema elementSchema = getSchema(firstElement);
+ return SchemaBuilder.array(elementSchema).build();
+ }
+ if (value instanceof BigDecimal) {
- if (value instanceof Long)
- return Schema.INT64_SCHEMA;
+ }
+ if (value instanceof BigDecimal) {
- if (value instanceof Float)
+ }
+ if (value instanceof Boolean) {
+ return Schema.BOOLEAN_SCHEMA;
+ }
+ if (value instanceof byte[]) {
+ return Schema.BYTES_SCHEMA;
+ }
+ if (value instanceof Byte) {
+ return Schema.INT8_SCHEMA;
+ }
+ if (value instanceof Double) {
+ return Schema.FLOAT64_SCHEMA;
+ }
+ if (value instanceof Float) {
return Schema.FLOAT32_SCHEMA;
+ }
+ if (value instanceof Integer) {
+ return Schema.INT32_SCHEMA;
+ }
+ if (value instanceof LocalDate) {
- if (value instanceof Double)
- return Schema.FLOAT64_SCHEMA;
+ }
+ if (value instanceof LocalDateTime) {
- if (value instanceof Boolean)
- return Schema.BOOLEAN_SCHEMA;
+ }
+ if (value instanceof LocalTime) {
- if (value instanceof String)
+ }
+ if (value instanceof Long) {
+ return Schema.INT64_SCHEMA;
+ }
+ if (value instanceof Short) {
+ return Schema.INT16_SCHEMA;
+ }
+ if (value instanceof String) {
return Schema.STRING_SCHEMA;
-
- if (value instanceof byte[])
- return Schema.BYTES_SCHEMA;
-
+ }
// TODO: add support for collective and complex types
throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName()));
}