You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/11 13:26:16 UTC

[incubator-plc4x] 01/05: added support for multiple queries in kafka source connector

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 4250310b29384d5accae029d14f559ca9ab85e8c
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Tue Sep 11 11:59:05 2018 +0200

    added support for multiple queries in kafka source connector
---
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java |  8 ++-
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 10 +--
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   | 42 +++++++------
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    | 72 +++++++++++++---------
 4 files changed, 76 insertions(+), 56 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 45ae926..1899208 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -18,6 +18,7 @@ under the License.
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -35,7 +36,7 @@ public class Plc4xSinkConnector extends SinkConnector {
     static final String QUERY_CONFIG = "query";
     private static final String QUERY_DOC = "Field query to be sent to the PLC";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
         .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
 
@@ -59,8 +60,9 @@ public class Plc4xSinkConnector extends SinkConnector {
 
     @Override
     public void start(Map<String, String> props) {
-        url = props.get(URL_CONFIG);
-        query = props.get(QUERY_CONFIG);
+        AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+        url = config.getString(URL_CONFIG);
+        query = config.getString(QUERY_CONFIG);
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index b29418f..a54d5b0 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -18,6 +18,7 @@ under the License.
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -33,8 +34,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public class Plc4xSinkTask extends SinkTask {
-    private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
-
     private String url;
     private String query;
 
@@ -48,8 +47,9 @@ public class Plc4xSinkTask extends SinkTask {
 
     @Override
     public void start(Map<String, String> props) {
-        url = props.get(Plc4xSinkConnector.URL_CONFIG);
-        query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
+        AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+        url = config.getString(Plc4xSinkConnector.URL_CONFIG);
+        query = config.getString(Plc4xSinkConnector.QUERY_CONFIG);
 
         openConnection();
 
@@ -66,7 +66,7 @@ public class Plc4xSinkTask extends SinkTask {
     public void put(Collection<SinkRecord> records) {
         for (SinkRecord record: records) {
             String value = record.value().toString(); // TODO: implement other data types
-            PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
+            PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(query, query, value).build();
             doWrite(plcRequest);
         }
     }
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4d1d9d0..4d014a5 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -18,13 +18,15 @@ under the License.
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -35,22 +37,22 @@ public class Plc4xSourceConnector extends SourceConnector {
     static final String URL_CONFIG = "url";
     private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
-    static final String QUERY_CONFIG = "query";
-    private static final String QUERY_DOC = "Field query to be sent to the PLC";
+    static final String QUERIES_CONFIG = "queries";
+    private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
 
     static final String RATE_CONFIG = "rate";
     private static final Integer RATE_DEFAULT = 1000;
     private static final String RATE_DOC = "Polling rate";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
         .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
-        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC)
+        .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
         .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
 
     private String topic;
     private String url;
-    private String query;
+    private List<String> queries;
     private Integer rate;
 
     @Override
@@ -60,22 +62,26 @@ public class Plc4xSourceConnector extends SourceConnector {
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        Map<String, String> taskConfig = new HashMap<>();
-        taskConfig.put(TOPIC_CONFIG, topic);
-        taskConfig.put(URL_CONFIG, url);
-        taskConfig.put(QUERY_CONFIG, query);
-        taskConfig.put(RATE_CONFIG, rate.toString());
-
-        // Only one task will be created; ignoring maxTasks for now
-        return Collections.singletonList(taskConfig);
+        List<Map<String, String>> configs = new LinkedList<>();
+        List<List<String>> queryGroups = ConnectorUtils.groupPartitions(queries, maxTasks);
+        for (List<String> queryGroup: queryGroups) {
+            Map<String, String> taskConfig = new HashMap<>();
+            taskConfig.put(TOPIC_CONFIG, topic);
+            taskConfig.put(URL_CONFIG, url);
+            taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup));
+            taskConfig.put(RATE_CONFIG, rate.toString());
+            configs.add(taskConfig);
+        }
+        return configs;
     }
 
     @Override
     public void start(Map<String, String> props) {
-        topic = props.get(TOPIC_CONFIG);
-        url = props.get(URL_CONFIG);
-        query = props.get(QUERY_CONFIG);
-        rate = Integer.valueOf(props.get(RATE_CONFIG));
+        AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+        topic = config.getString(TOPIC_CONFIG);
+        url = config.getString(URL_CONFIG);
+        queries = config.getList(QUERIES_CONFIG);
+        rate = config.getInt(RATE_CONFIG);
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 798ae31..c354a1e 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -18,6 +18,7 @@ under the License.
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -32,6 +33,7 @@ import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
@@ -45,11 +47,10 @@ import java.util.concurrent.*;
 public class Plc4xSourceTask extends SourceTask {
     private final static long WAIT_LIMIT_MILLIS = 100;
     private final static long TIMEOUT_LIMIT_MILLIS = 5000;
-    private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
 
     private String topic;
     private String url;
-    private String query;
+    private List<String> queries;
 
     private PlcConnection plcConnection;
     private PlcReader plcReader;
@@ -67,16 +68,22 @@ public class Plc4xSourceTask extends SourceTask {
 
     @Override
     public void start(Map<String, String> props) {
-        topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
-        url = props.get(Plc4xSourceConnector.URL_CONFIG);
-        query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
+        AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+        topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG);
+        url = config.getString(Plc4xSourceConnector.URL_CONFIG);
+        queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG);
 
         openConnection();
 
         plcReader = plcConnection.getReader()
             .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
 
-        plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, query).build();
+
+        PlcReadRequest.Builder builder = plcReader.readRequestBuilder();
+        for (String query : queries) {
+            builder.addItem(query, query);
+        }
+        plcRequest = builder.build();
 
         int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
         scheduler = Executors.newScheduledThreadPool(1);
@@ -152,30 +159,35 @@ public class Plc4xSourceTask extends SourceTask {
     }
 
     private List<SourceRecord> extractValues(PlcReadResponse<?> response) {
-        final PlcResponseCode rc = response.getResponseCode(FIELD_KEY);
-
-        if (!rc.equals(PlcResponseCode.OK))
-            return null; // TODO: should we really ignore this?
-
-        Object rawValue = response.getObject(FIELD_KEY);
-        Schema valueSchema = getSchema(rawValue.getClass());
-        Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
-        Long timestamp = System.currentTimeMillis();
-        Map<String, String> sourcePartition = Collections.singletonMap("url", url);
-        Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
-        SourceRecord record =
-            new SourceRecord(
-                sourcePartition,
-                sourceOffset,
-                topic,
-                Schema.STRING_SCHEMA,
-                query,
-                valueSchema,
-                value
-            );
-
-        return Collections.singletonList(record); // TODO: what if there are multiple values?
+        final List<SourceRecord> result = new LinkedList<>();
+        for (String query : queries) {
+            final PlcResponseCode rc = response.getResponseCode(query);
+            if (!rc.equals(PlcResponseCode.OK))  {
+                continue;
+            }
+
+            Object rawValue = response.getObject(query);
+            Schema valueSchema = getSchema(rawValue.getClass());
+            Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+            Long timestamp = System.currentTimeMillis();
+            Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+            Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+            SourceRecord record =
+                new SourceRecord(
+                    sourcePartition,
+                    sourceOffset,
+                    topic,
+                    Schema.STRING_SCHEMA,
+                    query,
+                    valueSchema,
+                    value
+                );
+
+            result.add(record);
+        }
+
+        return result;
     }
 
     private Schema getSchema(Class<?> type) {