You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@plc4x.apache.org by GitBox <gi...@apache.org> on 2018/09/11 10:40:41 UTC

[GitHub] asfgit closed pull request #17: Add support for multiple queries in kafka source connector

asfgit closed pull request #17: Add support for multiple queries in kafka source connector
URL: https://github.com/apache/incubator-plc4x/pull/17
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 45ae926eb..189920886 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -35,7 +36,7 @@ Licensed to the Apache Software Foundation (ASF) under one
     static final String QUERY_CONFIG = "query";
     private static final String QUERY_DOC = "Field query to be sent to the PLC";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
         .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
 
@@ -59,8 +60,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     @Override
     public void start(Map<String, String> props) {
-        url = props.get(URL_CONFIG);
-        query = props.get(QUERY_CONFIG);
+        AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+        url = config.getString(URL_CONFIG);
+        query = config.getString(QUERY_CONFIG);
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index b29418f18..a54d5b08b 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -33,8 +34,6 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.util.concurrent.ExecutionException;
 
 public class Plc4xSinkTask extends SinkTask {
-    private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
-
     private String url;
     private String query;
 
@@ -48,8 +47,9 @@ public String version() {
 
     @Override
     public void start(Map<String, String> props) {
-        url = props.get(Plc4xSinkConnector.URL_CONFIG);
-        query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
+        AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+        url = config.getString(Plc4xSinkConnector.URL_CONFIG);
+        query = config.getString(Plc4xSinkConnector.QUERY_CONFIG);
 
         openConnection();
 
@@ -66,7 +66,7 @@ public void stop() {
     public void put(Collection<SinkRecord> records) {
         for (SinkRecord record: records) {
             String value = record.value().toString(); // TODO: implement other data types
-            PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
+            PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(query, query, value).build();
             doWrite(plcRequest);
         }
     }
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4d1d9d026..4d014a535 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -18,13 +18,15 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -35,22 +37,22 @@ Licensed to the Apache Software Foundation (ASF) under one
     static final String URL_CONFIG = "url";
     private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
-    static final String QUERY_CONFIG = "query";
-    private static final String QUERY_DOC = "Field query to be sent to the PLC";
+    static final String QUERIES_CONFIG = "queries";
+    private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
 
     static final String RATE_CONFIG = "rate";
     private static final Integer RATE_DEFAULT = 1000;
     private static final String RATE_DOC = "Polling rate";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
         .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
-        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC)
+        .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
         .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
 
     private String topic;
     private String url;
-    private String query;
+    private List<String> queries;
     private Integer rate;
 
     @Override
@@ -60,22 +62,26 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        Map<String, String> taskConfig = new HashMap<>();
-        taskConfig.put(TOPIC_CONFIG, topic);
-        taskConfig.put(URL_CONFIG, url);
-        taskConfig.put(QUERY_CONFIG, query);
-        taskConfig.put(RATE_CONFIG, rate.toString());
-
-        // Only one task will be created; ignoring maxTasks for now
-        return Collections.singletonList(taskConfig);
+        List<Map<String, String>> configs = new LinkedList<>();
+        List<List<String>> queryGroups = ConnectorUtils.groupPartitions(queries, maxTasks);
+        for (List<String> queryGroup: queryGroups) {
+            Map<String, String> taskConfig = new HashMap<>();
+            taskConfig.put(TOPIC_CONFIG, topic);
+            taskConfig.put(URL_CONFIG, url);
+            taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup));
+            taskConfig.put(RATE_CONFIG, rate.toString());
+            configs.add(taskConfig);
+        }
+        return configs;
     }
 
     @Override
     public void start(Map<String, String> props) {
-        topic = props.get(TOPIC_CONFIG);
-        url = props.get(URL_CONFIG);
-        query = props.get(QUERY_CONFIG);
-        rate = Integer.valueOf(props.get(RATE_CONFIG));
+        AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+        topic = config.getString(TOPIC_CONFIG);
+        url = config.getString(URL_CONFIG);
+        queries = config.getList(QUERIES_CONFIG);
+        rate = config.getInt(RATE_CONFIG);
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 798ae3113..c354a1ea2 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -32,6 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.kafka.util.VersionUtil;
 
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
@@ -45,11 +47,10 @@ Licensed to the Apache Software Foundation (ASF) under one
 public class Plc4xSourceTask extends SourceTask {
     private final static long WAIT_LIMIT_MILLIS = 100;
     private final static long TIMEOUT_LIMIT_MILLIS = 5000;
-    private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
 
     private String topic;
     private String url;
-    private String query;
+    private List<String> queries;
 
     private PlcConnection plcConnection;
     private PlcReader plcReader;
@@ -67,16 +68,22 @@ public String version() {
 
     @Override
     public void start(Map<String, String> props) {
-        topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
-        url = props.get(Plc4xSourceConnector.URL_CONFIG);
-        query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
+        AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+        topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG);
+        url = config.getString(Plc4xSourceConnector.URL_CONFIG);
+        queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG);
 
         openConnection();
 
         plcReader = plcConnection.getReader()
             .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
 
-        plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, query).build();
+
+        PlcReadRequest.Builder builder = plcReader.readRequestBuilder();
+        for (String query : queries) {
+            builder.addItem(query, query);
+        }
+        plcRequest = builder.build();
 
         int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
         scheduler = Executors.newScheduledThreadPool(1);
@@ -152,30 +159,35 @@ private synchronized boolean awaitFetch(long milliseconds) throws InterruptedExc
     }
 
     private List<SourceRecord> extractValues(PlcReadResponse<?> response) {
-        final PlcResponseCode rc = response.getResponseCode(FIELD_KEY);
-
-        if (!rc.equals(PlcResponseCode.OK))
-            return null; // TODO: should we really ignore this?
-
-        Object rawValue = response.getObject(FIELD_KEY);
-        Schema valueSchema = getSchema(rawValue.getClass());
-        Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
-        Long timestamp = System.currentTimeMillis();
-        Map<String, String> sourcePartition = Collections.singletonMap("url", url);
-        Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
-        SourceRecord record =
-            new SourceRecord(
-                sourcePartition,
-                sourceOffset,
-                topic,
-                Schema.STRING_SCHEMA,
-                query,
-                valueSchema,
-                value
-            );
-
-        return Collections.singletonList(record); // TODO: what if there are multiple values?
+        final List<SourceRecord> result = new LinkedList<>();
+        for (String query : queries) {
+            final PlcResponseCode rc = response.getResponseCode(query);
+            if (!rc.equals(PlcResponseCode.OK))  {
+                continue;
+            }
+
+            Object rawValue = response.getObject(query);
+            Schema valueSchema = getSchema(rawValue.getClass());
+            Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+            Long timestamp = System.currentTimeMillis();
+            Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+            Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+            SourceRecord record =
+                new SourceRecord(
+                    sourcePartition,
+                    sourceOffset,
+                    topic,
+                    Schema.STRING_SCHEMA,
+                    query,
+                    valueSchema,
+                    value
+                );
+
+            result.add(record);
+        }
+
+        return result;
     }
 
     private Schema getSchema(Class<?> type) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services