You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/17 07:07:48 UTC
[incubator-plc4x] 04/29: implemented support for multiple workers
in kafka sink connector
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit adf6b6735317b5aa306b06f461e09806ddd54769
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Tue Sep 11 14:23:32 2018 +0200
implemented support for multiple workers in kafka sink connector
---
.../org/apache/plc4x/kafka/Plc4xSinkConnector.java | 25 ++++++++--------------
.../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 23 ++++++++++++++++----
.../org/apache/plc4x/java/test/TestDevice.java | 7 +++---
.../apache/plc4x/java/test/TestFieldHandler.java | 7 +++---
4 files changed, 36 insertions(+), 26 deletions(-)
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 1899208..fa2e32d 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -24,21 +24,14 @@ import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.plc4x.kafka.util.VersionUtil;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class Plc4xSinkConnector extends SinkConnector {
static final String URL_CONFIG = "url";
private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
- static final String QUERY_CONFIG = "query";
- private static final String QUERY_DOC = "Field query to be sent to the PLC";
-
static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
- .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
+ .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
private String url;
private String query;
@@ -50,19 +43,19 @@ public class Plc4xSinkConnector extends SinkConnector {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
- Map<String, String> taskConfig = new HashMap<>();
- taskConfig.put(URL_CONFIG, url);
- taskConfig.put(QUERY_CONFIG, query);
-
- // Only one task will be created; ignoring maxTasks for now
- return Collections.singletonList(taskConfig);
+ List<Map<String, String>> configs = new LinkedList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(URL_CONFIG, url);
+ configs.add(taskConfig);
+ }
+ return configs;
}
@Override
public void start(Map<String, String> props) {
AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
url = config.getString(URL_CONFIG);
- query = config.getString(QUERY_CONFIG);
}
@Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index a54d5b0..648a32e 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -35,7 +35,6 @@ import java.util.concurrent.ExecutionException;
public class Plc4xSinkTask extends SinkTask {
private String url;
- private String query;
private PlcConnection plcConnection;
private PlcWriter plcWriter;
@@ -49,7 +48,6 @@ public class Plc4xSinkTask extends SinkTask {
public void start(Map<String, String> props) {
AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
url = config.getString(Plc4xSinkConnector.URL_CONFIG);
- query = config.getString(Plc4xSinkConnector.QUERY_CONFIG);
openConnection();
@@ -65,12 +63,29 @@ public class Plc4xSinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record: records) {
- String value = record.value().toString(); // TODO: implement other data types
- PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(query, query, value).build();
+ String query = record.key().toString();
+ Object value = record.value();
+ PlcWriteRequest.Builder builder = plcWriter.writeRequestBuilder();
+ PlcWriteRequest plcRequest = addToBuilder(builder, query, value).build();
doWrite(plcRequest);
}
}
+ // TODO: fix this
+ private PlcWriteRequest.Builder addToBuilder(PlcWriteRequest.Builder builder, String query, Object obj) {
+ Class<?> type = obj.getClass();
+
+ if (type.equals(Integer.class)) {
+ int value = (int) obj;
+ builder.addItem(query, query, value);
+ } else if (type.equals(String.class)) {
+ String value = (String) obj;
+ builder.addItem(query, query, value);
+ }
+
+ return builder;
+ }
+
private void openConnection() {
try {
plcConnection = new PlcDriverManager().getConnection(url);
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
index 65cce8a..3966deb 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -51,15 +51,16 @@ class TestDevice {
void set(TestField field, FieldItem value) {
Objects.requireNonNull(field);
- switch(field.getType()) {
+ switch (field.getType()) {
case STATE:
state.put(field, value);
return;
case STDOUT:
- System.out.printf("TEST PLC: %s%n", value.getObject(0).toString());
+ System.out.printf("TEST PLC STDOUT [%s]: %s%n", field.getName(), Objects.toString(value.getValues()[0]));
return;
case RANDOM:
- // Just ignore this ...
+ System.out.printf("TEST PLC RANDOM [%s]: %s%n", field.getName(), Objects.toString(value.getValues()[0]));
+ return;
}
throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
}
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java
index 7c9f286..5953da6 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java
@@ -28,6 +28,7 @@ import org.apache.plc4x.java.base.messages.items.*;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.util.Arrays;
public class TestFieldHandler implements PlcFieldHandler {
@@ -52,7 +53,7 @@ public class TestFieldHandler implements PlcFieldHandler {
public FieldItem encodeByte(PlcField field, Object[] values) {
TestField testField = (TestField) field;
if(testField.getDataType() == Byte.class) {
- return new DefaultIntegerFieldItem((Long[]) values);
+ return new DefaultIntegerFieldItem(Arrays.stream(values).map(x -> new Long((Byte) x)).toArray(Long[]::new));
}
throw new PlcRuntimeException("Invalid encoder for type " + testField.getDataType().getName());
}
@@ -61,7 +62,7 @@ public class TestFieldHandler implements PlcFieldHandler {
public FieldItem encodeShort(PlcField field, Object[] values) {
TestField testField = (TestField) field;
if(testField.getDataType() == Short.class) {
- return new DefaultIntegerFieldItem((Long[]) values);
+ return new DefaultIntegerFieldItem(Arrays.stream(values).map(x -> new Long((Short) x)).toArray(Long[]::new));
}
throw new PlcRuntimeException("Invalid encoder for type " + testField.getDataType().getName());
}
@@ -70,7 +71,7 @@ public class TestFieldHandler implements PlcFieldHandler {
public FieldItem encodeInteger(PlcField field, Object[] values) {
TestField testField = (TestField) field;
if(testField.getDataType() == Integer.class) {
- return new DefaultIntegerFieldItem((Long[]) values);
+ return new DefaultIntegerFieldItem(Arrays.stream(values).map(x -> new Long((Integer) x)).toArray(Long[]::new));
}
throw new PlcRuntimeException("Invalid encoder for type " + testField.getDataType().getName());
}