You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/10/17 16:48:25 UTC

[incubator-plc4x] 18/19: allow multiple connections in kafka connector

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 48dbad00b18c59c70bbe5e94160ec7a1f04b4c6a
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Fri Oct 12 13:22:02 2018 +0200

    allow multiple connections in kafka connector
---
 integrations/apache-kafka/config/source.properties |  3 +-
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   | 45 +++++++++++-----------
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    | 30 ++++++++++++---
 3 files changed, 48 insertions(+), 30 deletions(-)

diff --git a/integrations/apache-kafka/config/source.properties b/integrations/apache-kafka/config/source.properties
index cbd00f5..afa7e93 100644
--- a/integrations/apache-kafka/config/source.properties
+++ b/integrations/apache-kafka/config/source.properties
@@ -19,6 +19,5 @@ limitations under the License.
 name=plc-source-test
 connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector
 topic=test
-url=test:unused
-queries=RANDOM/foo:INTEGER,RANDOM/bar:STRING
+queries=test:unused#RANDOM/foo:INTEGER,test:another#RANDOM/bar:STRING
 rate=2000
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4d014a5..bb1392e 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -22,36 +22,28 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.kafka.connect.util.ConnectorUtils;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.stream.Collectors;
 
 public class Plc4xSourceConnector extends SourceConnector {
-    static final String TOPIC_CONFIG = "topic";
+    private static final String TOPIC_CONFIG = "topic";
     private static final String TOPIC_DOC = "Kafka topic to publish to";
 
-    static final String URL_CONFIG = "url";
-    private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
-
-    static final String QUERIES_CONFIG = "queries";
+    private static final String QUERIES_CONFIG = "queries";
     private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
 
-    static final String RATE_CONFIG = "rate";
+    private static final String RATE_CONFIG = "rate";
     private static final Integer RATE_DEFAULT = 1000;
     private static final String RATE_DOC = "Polling rate";
 
-    static final ConfigDef CONFIG_DEF = new ConfigDef()
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
-        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
         .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
         .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
 
     private String topic;
-    private String url;
     private List<String> queries;
     private Integer rate;
 
@@ -63,23 +55,30 @@ public class Plc4xSourceConnector extends SourceConnector {
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         List<Map<String, String>> configs = new LinkedList<>();
-        List<List<String>> queryGroups = ConnectorUtils.groupPartitions(queries, maxTasks);
-        for (List<String> queryGroup: queryGroups) {
+        Map<String, List<String>> groupedByHost = new HashMap<>();
+        queries.stream().map(query -> query.split("#", 2)).collect(Collectors.groupingBy(parts -> parts[0])).forEach((host, queries) -> {
+            groupedByHost.put(host, queries.stream().map(parts -> parts[1]).collect(Collectors.toList()));
+        });
+        if (groupedByHost.size() > maxTasks) {
+            // Not enough tasks
+            // TODO: throw exception?
+            return Collections.emptyList();
+        }
+        groupedByHost.forEach((host, qs) -> {
             Map<String, String> taskConfig = new HashMap<>();
-            taskConfig.put(TOPIC_CONFIG, topic);
-            taskConfig.put(URL_CONFIG, url);
-            taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup));
-            taskConfig.put(RATE_CONFIG, rate.toString());
+            taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic);
+            taskConfig.put(Plc4xSourceTask.URL_CONFIG, host);
+            taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", qs));
+            taskConfig.put(Plc4xSourceTask.RATE_CONFIG, rate.toString());
             configs.add(taskConfig);
-        }
+        });
         return configs;
     }
 
     @Override
     public void start(Map<String, String> props) {
-        AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
         topic = config.getString(TOPIC_CONFIG);
-        url = config.getString(URL_CONFIG);
         queries = config.getList(QUERIES_CONFIG);
         rate = config.getInt(RATE_CONFIG);
     }
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 2bbc56b..f172a38 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -19,6 +19,7 @@ under the License.
 package org.apache.plc4x.kafka;
 
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
@@ -43,6 +44,25 @@ import java.util.concurrent.*;
  * If the flag does not become true, the method returns null, otherwise a fetch is performed.
  */
 public class Plc4xSourceTask extends SourceTask {
+    static final String TOPIC_CONFIG = "topic";
+    private static final String TOPIC_DOC = "Kafka topic to publish to";
+
+    static final String URL_CONFIG = "url";
+    private static final String URL_DOC = "PLC URL";
+
+    static final String QUERIES_CONFIG = "queries";
+    private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
+
+    static final String RATE_CONFIG = "rate";
+    private static final Integer RATE_DEFAULT = 1000;
+    private static final String RATE_DOC = "Polling rate";
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
+        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
+        .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
+        .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
+
     private final static long WAIT_LIMIT_MILLIS = 100;
     private final static long TIMEOUT_LIMIT_MILLIS = 5000;
 
@@ -72,10 +92,10 @@ public class Plc4xSourceTask extends SourceTask {
 
     @Override
     public void start(Map<String, String> props) {
-        AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
-        topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG);
-        url = config.getString(Plc4xSourceConnector.URL_CONFIG);
-        queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG);
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
+        topic = config.getString(TOPIC_CONFIG);
+        url = config.getString(URL_CONFIG);
+        queries = config.getList(QUERIES_CONFIG);
 
         openConnection();
 
@@ -83,7 +103,7 @@ public class Plc4xSourceTask extends SourceTask {
             throw new ConnectException("Reading not supported on this connection");
         }
 
-        int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
+        int rate = Integer.valueOf(props.get(RATE_CONFIG));
         scheduler = Executors.newScheduledThreadPool(1);
         scheduler.scheduleAtFixedRate(Plc4xSourceTask.this::scheduleFetch, rate, rate, TimeUnit.MILLISECONDS);
     }