You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/24 17:37:06 UTC

[plc4x] 02/02: - Finished a first fully operational version of the Kafka Connect Source

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 74c892cda34d2ac9d1f1b7eb94b2d55bfba60c6e
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Sat Aug 24 19:36:58 2019 +0200

    - Finished a first fully operational version of the Kafka Connect Source
---
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    | 175 ++++++++++++---------
 1 file changed, 101 insertions(+), 74 deletions(-)

diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 87ede4e..81b0d62 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.scraper.ResultHandler;
 import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
 import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
 import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
@@ -40,6 +39,10 @@ import org.apache.plc4x.kafka.util.VersionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -76,21 +79,13 @@ public class Plc4xSourceTask extends SourceTask {
      */
     private static final String SOURCE_NAME_FIELD = "source-name";
     private static final String JOB_NAME_FIELD = "job-name";
-    private static final String FIELD_NAME_FIELD = "field-name";
 
     private static final Schema KEY_SCHEMA =
         new SchemaBuilder(Schema.Type.STRUCT)
             .field(SOURCE_NAME_FIELD, Schema.STRING_SCHEMA)
             .field(JOB_NAME_FIELD, Schema.STRING_SCHEMA)
-            .field(FIELD_NAME_FIELD, Schema.STRING_SCHEMA)
             .build();
 
-    // Internal properties.
-    private Map<String, String> topics;
-    private PlcDriverManager plcDriverManager;
-    private TriggerCollector triggerCollector;
-    private TriggeredScraperImpl scraper;
-
     // Internal buffer into which all incoming scraper responses are written to.
     private ArrayBlockingQueue<SourceRecord> buffer;
 
@@ -104,7 +99,7 @@ public class Plc4xSourceTask extends SourceTask {
         AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
         String connectionName = config.getString(CONNECTION_NAME_CONFIG);
         String plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
-        topics = new HashMap<>();
+        Map<String, String> topics = new HashMap<>();
         // Create a buffer with a capacity of 1000 elements which schedules access in a fair way.
         buffer = new ArrayBlockingQueue<>(1000, true);
 
@@ -145,51 +140,58 @@ public class Plc4xSourceTask extends SourceTask {
         ScraperConfigurationTriggeredImpl scraperConfig = builder.build();
 
         try {
-            plcDriverManager = new PooledPlcDriverManager();
-            triggerCollector = new TriggerCollectorImpl(plcDriverManager);
-            scraper = new TriggeredScraperImpl(scraperConfig, new ResultHandler() {
-                @Override
-                public void handle(String jobName, String sourceName, Map<String, Object> results) {
-                    Long timestamp = System.currentTimeMillis();
-
-                    Map<String, String> sourcePartition = new HashMap<>();
-                    sourcePartition.put("sourceName", sourceName);
-                    sourcePartition.put("jobName", jobName);
-
-                    Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
-                    String topic = topics.get(jobName);
-
-                    for (Map.Entry<String, Object> result : results.entrySet()) {
-                        // Get field-name and -value from the results.
-                        String fieldName = result.getKey();
-                        Object fieldValue = result.getValue();
-
-                        // Prepare the key structure.
-                        Struct key = new Struct(KEY_SCHEMA)
-                            .put(SOURCE_NAME_FIELD, sourceName)
-                            .put(JOB_NAME_FIELD, jobName)
-                            .put(FIELD_NAME_FIELD, fieldName);
-
-                        // Get the schema for the given value type.
-                        Schema valueSchema = getSchema(fieldValue);
-
-                        // Prepare the source-record element.
-                        SourceRecord record =
-                            new SourceRecord(
-                                sourcePartition,
-                                sourceOffset,
-                                topic,
-                                KEY_SCHEMA,
-                                key,
-                                valueSchema,
-                                fieldValue
-                            );
-
-                        // Add the new source-record to the buffer.
-                        buffer.add(record);
-                    }
+            PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
+            TriggerCollector triggerCollector = new TriggerCollectorImpl(plcDriverManager);
+            TriggeredScraperImpl scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> {
+                Long timestamp = System.currentTimeMillis();
+
+                Map<String, String> sourcePartition = new HashMap<>();
+                sourcePartition.put("sourceName", sourceName);
+                sourcePartition.put("jobName", jobName);
+
+                Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+                String topic = topics.get(jobName);
+
+                // Prepare the key structure.
+                Struct key = new Struct(KEY_SCHEMA)
+                    .put(SOURCE_NAME_FIELD, sourceName)
+                    .put(JOB_NAME_FIELD, jobName);
+
+                // Build the Schema for the result struct.
+                SchemaBuilder recordSchemaBuilder = SchemaBuilder.struct().name("org.apache.plc4x.kafka.JobResult");
+                for (Map.Entry<String, Object> result : results.entrySet()) {
+                    // Get field-name and -value from the results.
+                    String fieldName = result.getKey();
+                    Object fieldValue = result.getValue();
+
+                    // Get the schema for the given value type.
+                    Schema valueSchema = getSchema(fieldValue);
+
+                    // Add the schema description for the current field.
+                    recordSchemaBuilder.field(fieldName, valueSchema);
+                }
+                Schema recordSchema = recordSchemaBuilder.build();
+
+                // Build the struct itself.
+                Struct recordStruct = new Struct(recordSchema);
+                for (Map.Entry<String, Object> result : results.entrySet()) {
+                    // Get field-name and -value from the results.
+                    String fieldName = result.getKey();
+                    Object fieldValue = result.getValue();
+                    recordStruct.put(fieldName, fieldValue);
                 }
+
+                // Prepare the source-record element.
+                SourceRecord record = new SourceRecord(
+                    sourcePartition, sourceOffset,
+                    topic,
+                    KEY_SCHEMA, key,
+                    recordSchema, recordStruct
+                    );
+
+                // Add the new source-record to the buffer.
+                buffer.add(record);
             }, triggerCollector);
             scraper.start();
             triggerCollector.start();
@@ -221,33 +223,58 @@ public class Plc4xSourceTask extends SourceTask {
     private Schema getSchema(Object value) {
         Objects.requireNonNull(value);
 
-        if (value instanceof Byte)
-            return Schema.INT8_SCHEMA;
-
-        if (value instanceof Short)
-            return Schema.INT16_SCHEMA;
-
-        if (value instanceof Integer)
-            return Schema.INT32_SCHEMA;
+        if(value instanceof List) {
+            List list = (List) value;
+            if(list.isEmpty()) {
+                throw new ConnectException("Unsupported empty lists.");
+            }
+            // In PLC4X list elements all contain the same type.
+            Object firstElement = list.get(0);
+            Schema elementSchema = getSchema(firstElement);
+            return SchemaBuilder.array(elementSchema).build();
+        }
+        if (value instanceof BigDecimal) {
 
-        if (value instanceof Long)
-            return Schema.INT64_SCHEMA;
+        }
+        if (value instanceof BigDecimal) {
 
-        if (value instanceof Float)
+        }
+        if (value instanceof Boolean) {
+            return Schema.BOOLEAN_SCHEMA;
+        }
+        if (value instanceof byte[]) {
+            return Schema.BYTES_SCHEMA;
+        }
+        if (value instanceof Byte) {
+            return Schema.INT8_SCHEMA;
+        }
+        if (value instanceof Double) {
+            return Schema.FLOAT64_SCHEMA;
+        }
+        if (value instanceof Float) {
             return Schema.FLOAT32_SCHEMA;
+        }
+        if (value instanceof Integer) {
+            return Schema.INT32_SCHEMA;
+        }
+        if (value instanceof LocalDate) {
 
-        if (value instanceof Double)
-            return Schema.FLOAT64_SCHEMA;
+        }
+        if (value instanceof LocalDateTime) {
 
-        if (value instanceof Boolean)
-            return Schema.BOOLEAN_SCHEMA;
+        }
+        if (value instanceof LocalTime) {
 
-        if (value instanceof String)
+        }
+        if (value instanceof Long) {
+            return Schema.INT64_SCHEMA;
+        }
+        if (value instanceof Short) {
+            return Schema.INT16_SCHEMA;
+        }
+        if (value instanceof String) {
             return Schema.STRING_SCHEMA;
-
-        if (value instanceof byte[])
-            return Schema.BYTES_SCHEMA;
-
+        }
         // TODO: add support for collective and complex types
         throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName()));
     }