You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/11 10:40:37 UTC
[incubator-plc4x] branch skorikov-feature/api-redesign-chris-c
updated: added support for multiple queries in kafka source connector
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/skorikov-feature/api-redesign-chris-c by this push:
new 4250310 added support for multiple queries in kafka source connector
new 130fa97 Merge branch 'skorikov-skorikov-feature/api-redesign-chris-c' into skorikov-feature/api-redesign-chris-c
4250310 is described below
commit 4250310b29384d5accae029d14f559ca9ab85e8c
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Tue Sep 11 11:59:05 2018 +0200
added support for multiple queries in kafka source connector
---
.../org/apache/plc4x/kafka/Plc4xSinkConnector.java | 8 ++-
.../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 10 +--
.../apache/plc4x/kafka/Plc4xSourceConnector.java | 42 +++++++------
.../org/apache/plc4x/kafka/Plc4xSourceTask.java | 72 +++++++++++++---------
4 files changed, 76 insertions(+), 56 deletions(-)
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 45ae926..1899208 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -18,6 +18,7 @@ under the License.
*/
package org.apache.plc4x.kafka;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
@@ -35,7 +36,7 @@ public class Plc4xSinkConnector extends SinkConnector {
static final String QUERY_CONFIG = "query";
private static final String QUERY_DOC = "Field query to be sent to the PLC";
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
.define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
@@ -59,8 +60,9 @@ public class Plc4xSinkConnector extends SinkConnector {
@Override
public void start(Map<String, String> props) {
- url = props.get(URL_CONFIG);
- query = props.get(QUERY_CONFIG);
+ AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+ url = config.getString(URL_CONFIG);
+ query = config.getString(QUERY_CONFIG);
}
@Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index b29418f..a54d5b0 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -18,6 +18,7 @@ under the License.
*/
package org.apache.plc4x.kafka;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@@ -33,8 +34,6 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
public class Plc4xSinkTask extends SinkTask {
- private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
-
private String url;
private String query;
@@ -48,8 +47,9 @@ public class Plc4xSinkTask extends SinkTask {
@Override
public void start(Map<String, String> props) {
- url = props.get(Plc4xSinkConnector.URL_CONFIG);
- query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
+ AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+ url = config.getString(Plc4xSinkConnector.URL_CONFIG);
+ query = config.getString(Plc4xSinkConnector.QUERY_CONFIG);
openConnection();
@@ -66,7 +66,7 @@ public class Plc4xSinkTask extends SinkTask {
public void put(Collection<SinkRecord> records) {
for (SinkRecord record: records) {
String value = record.value().toString(); // TODO: implement other data types
- PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
+ PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(query, query, value).build();
doWrite(plcRequest);
}
}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4d1d9d0..4d014a5 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -18,13 +18,15 @@ under the License.
*/
package org.apache.plc4x.kafka;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
import org.apache.plc4x.kafka.util.VersionUtil;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -35,22 +37,22 @@ public class Plc4xSourceConnector extends SourceConnector {
static final String URL_CONFIG = "url";
private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
- static final String QUERY_CONFIG = "query";
- private static final String QUERY_DOC = "Field query to be sent to the PLC";
+ static final String QUERIES_CONFIG = "queries";
+ private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
static final String RATE_CONFIG = "rate";
private static final Integer RATE_DEFAULT = 1000;
private static final String RATE_DOC = "Polling rate";
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
- .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC)
+ .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
.define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
private String topic;
private String url;
- private String query;
+ private List<String> queries;
private Integer rate;
@Override
@@ -60,22 +62,26 @@ public class Plc4xSourceConnector extends SourceConnector {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
- Map<String, String> taskConfig = new HashMap<>();
- taskConfig.put(TOPIC_CONFIG, topic);
- taskConfig.put(URL_CONFIG, url);
- taskConfig.put(QUERY_CONFIG, query);
- taskConfig.put(RATE_CONFIG, rate.toString());
-
- // Only one task will be created; ignoring maxTasks for now
- return Collections.singletonList(taskConfig);
+ List<Map<String, String>> configs = new LinkedList<>();
+ List<List<String>> queryGroups = ConnectorUtils.groupPartitions(queries, maxTasks);
+ for (List<String> queryGroup: queryGroups) {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(TOPIC_CONFIG, topic);
+ taskConfig.put(URL_CONFIG, url);
+ taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup));
+ taskConfig.put(RATE_CONFIG, rate.toString());
+ configs.add(taskConfig);
+ }
+ return configs;
}
@Override
public void start(Map<String, String> props) {
- topic = props.get(TOPIC_CONFIG);
- url = props.get(URL_CONFIG);
- query = props.get(QUERY_CONFIG);
- rate = Integer.valueOf(props.get(RATE_CONFIG));
+ AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+ topic = config.getString(TOPIC_CONFIG);
+ url = config.getString(URL_CONFIG);
+ queries = config.getList(QUERIES_CONFIG);
+ rate = config.getInt(RATE_CONFIG);
}
@Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 798ae31..c354a1e 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -18,6 +18,7 @@ under the License.
*/
package org.apache.plc4x.kafka;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
@@ -32,6 +33,7 @@ import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.kafka.util.VersionUtil;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@@ -45,11 +47,10 @@ import java.util.concurrent.*;
public class Plc4xSourceTask extends SourceTask {
private final static long WAIT_LIMIT_MILLIS = 100;
private final static long TIMEOUT_LIMIT_MILLIS = 5000;
- private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
private String topic;
private String url;
- private String query;
+ private List<String> queries;
private PlcConnection plcConnection;
private PlcReader plcReader;
@@ -67,16 +68,22 @@ public class Plc4xSourceTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
- topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
- url = props.get(Plc4xSourceConnector.URL_CONFIG);
- query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
+ AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+ topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG);
+ url = config.getString(Plc4xSourceConnector.URL_CONFIG);
+ queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG);
openConnection();
plcReader = plcConnection.getReader()
.orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
- plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, query).build();
+
+ PlcReadRequest.Builder builder = plcReader.readRequestBuilder();
+ for (String query : queries) {
+ builder.addItem(query, query);
+ }
+ plcRequest = builder.build();
int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
scheduler = Executors.newScheduledThreadPool(1);
@@ -152,30 +159,35 @@ public class Plc4xSourceTask extends SourceTask {
}
private List<SourceRecord> extractValues(PlcReadResponse<?> response) {
- final PlcResponseCode rc = response.getResponseCode(FIELD_KEY);
-
- if (!rc.equals(PlcResponseCode.OK))
- return null; // TODO: should we really ignore this?
-
- Object rawValue = response.getObject(FIELD_KEY);
- Schema valueSchema = getSchema(rawValue.getClass());
- Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
- Long timestamp = System.currentTimeMillis();
- Map<String, String> sourcePartition = Collections.singletonMap("url", url);
- Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
- SourceRecord record =
- new SourceRecord(
- sourcePartition,
- sourceOffset,
- topic,
- Schema.STRING_SCHEMA,
- query,
- valueSchema,
- value
- );
-
- return Collections.singletonList(record); // TODO: what if there are multiple values?
+ final List<SourceRecord> result = new LinkedList<>();
+ for (String query : queries) {
+ final PlcResponseCode rc = response.getResponseCode(query);
+ if (!rc.equals(PlcResponseCode.OK)) {
+ continue;
+ }
+
+ Object rawValue = response.getObject(query);
+ Schema valueSchema = getSchema(rawValue.getClass());
+ Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+ Long timestamp = System.currentTimeMillis();
+ Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+ Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+ SourceRecord record =
+ new SourceRecord(
+ sourcePartition,
+ sourceOffset,
+ topic,
+ Schema.STRING_SCHEMA,
+ query,
+ valueSchema,
+ value
+ );
+
+ result.add(record);
+ }
+
+ return result;
}
private Schema getSchema(Class<?> type) {