You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/10/17 16:48:25 UTC
[incubator-plc4x] 18/19: allow multiple connections in kafka
connector
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 48dbad00b18c59c70bbe5e94160ec7a1f04b4c6a
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Fri Oct 12 13:22:02 2018 +0200
allow multiple connections in kafka connector
---
integrations/apache-kafka/config/source.properties | 3 +-
.../apache/plc4x/kafka/Plc4xSourceConnector.java | 45 +++++++++++-----------
.../org/apache/plc4x/kafka/Plc4xSourceTask.java | 30 ++++++++++++---
3 files changed, 48 insertions(+), 30 deletions(-)
diff --git a/integrations/apache-kafka/config/source.properties b/integrations/apache-kafka/config/source.properties
index cbd00f5..afa7e93 100644
--- a/integrations/apache-kafka/config/source.properties
+++ b/integrations/apache-kafka/config/source.properties
@@ -19,6 +19,5 @@ limitations under the License.
name=plc-source-test
connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector
topic=test
-url=test:unused
-queries=RANDOM/foo:INTEGER,RANDOM/bar:STRING
+queries=test:unused#RANDOM/foo:INTEGER,test:another#RANDOM/bar:STRING
rate=2000
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4d014a5..bb1392e 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -22,36 +22,28 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.kafka.connect.util.ConnectorUtils;
import org.apache.plc4x.kafka.util.VersionUtil;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.stream.Collectors;
public class Plc4xSourceConnector extends SourceConnector {
- static final String TOPIC_CONFIG = "topic";
+ private static final String TOPIC_CONFIG = "topic";
private static final String TOPIC_DOC = "Kafka topic to publish to";
- static final String URL_CONFIG = "url";
- private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
-
- static final String QUERIES_CONFIG = "queries";
+ private static final String QUERIES_CONFIG = "queries";
private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
- static final String RATE_CONFIG = "rate";
+ private static final String RATE_CONFIG = "rate";
private static final Integer RATE_DEFAULT = 1000;
private static final String RATE_DOC = "Polling rate";
- static final ConfigDef CONFIG_DEF = new ConfigDef()
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
- .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
.define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
.define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
private String topic;
- private String url;
private List<String> queries;
private Integer rate;
@@ -63,23 +55,30 @@ public class Plc4xSourceConnector extends SourceConnector {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new LinkedList<>();
- List<List<String>> queryGroups = ConnectorUtils.groupPartitions(queries, maxTasks);
- for (List<String> queryGroup: queryGroups) {
+ Map<String, List<String>> groupedByHost = new HashMap<>();
+ queries.stream().map(query -> query.split("#", 2)).collect(Collectors.groupingBy(parts -> parts[0])).forEach((host, queries) -> {
+ groupedByHost.put(host, queries.stream().map(parts -> parts[1]).collect(Collectors.toList()));
+ });
+ if (groupedByHost.size() > maxTasks) {
+ // Not enough tasks
+ // TODO: throw exception?
+ return Collections.emptyList();
+ }
+ groupedByHost.forEach((host, qs) -> {
Map<String, String> taskConfig = new HashMap<>();
- taskConfig.put(TOPIC_CONFIG, topic);
- taskConfig.put(URL_CONFIG, url);
- taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup));
- taskConfig.put(RATE_CONFIG, rate.toString());
+ taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic);
+ taskConfig.put(Plc4xSourceTask.URL_CONFIG, host);
+ taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", qs));
+ taskConfig.put(Plc4xSourceTask.RATE_CONFIG, rate.toString());
configs.add(taskConfig);
- }
+ });
return configs;
}
@Override
public void start(Map<String, String> props) {
- AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
topic = config.getString(TOPIC_CONFIG);
- url = config.getString(URL_CONFIG);
queries = config.getList(QUERIES_CONFIG);
rate = config.getInt(RATE_CONFIG);
}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 2bbc56b..f172a38 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -19,6 +19,7 @@ under the License.
package org.apache.plc4x.kafka;
import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@@ -43,6 +44,25 @@ import java.util.concurrent.*;
* If the flag does not become true, the method returns null, otherwise a fetch is performed.
*/
public class Plc4xSourceTask extends SourceTask {
+ static final String TOPIC_CONFIG = "topic";
+ private static final String TOPIC_DOC = "Kafka topic to publish to";
+
+ static final String URL_CONFIG = "url";
+ private static final String URL_DOC = "PLC URL";
+
+ static final String QUERIES_CONFIG = "queries";
+ private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
+
+ static final String RATE_CONFIG = "rate";
+ private static final Integer RATE_DEFAULT = 1000;
+ private static final String RATE_DOC = "Polling rate";
+
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
+ .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
+ .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
+ .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
+
private final static long WAIT_LIMIT_MILLIS = 100;
private final static long TIMEOUT_LIMIT_MILLIS = 5000;
@@ -72,10 +92,10 @@ public class Plc4xSourceTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
- AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
- topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG);
- url = config.getString(Plc4xSourceConnector.URL_CONFIG);
- queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG);
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
+ topic = config.getString(TOPIC_CONFIG);
+ url = config.getString(URL_CONFIG);
+ queries = config.getList(QUERIES_CONFIG);
openConnection();
@@ -83,7 +103,7 @@ public class Plc4xSourceTask extends SourceTask {
throw new ConnectException("Reading not supported on this connection");
}
- int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
+ int rate = Integer.valueOf(props.get(RATE_CONFIG));
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(Plc4xSourceTask.this::scheduleFetch, rate, rate, TimeUnit.MILLISECONDS);
}