You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/10 16:20:21 UTC
[incubator-plc4x] 07/08: implemented kafka sink connector
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 955ad7432c1e47e5850e516fde01f4a9476ee3b4
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 16:26:54 2018 +0200
implemented kafka sink connector
---
.../src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java | 9 ++++++++-
.../src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 3 ++-
plc4j/protocols/pom.xml | 2 +-
.../src/main/java/org/apache/plc4x/java/test/TestDevice.java | 7 ++++++-
.../test/src/main/java/org/apache/plc4x/java/test/TestField.java | 4 +++-
.../test/src/main/java/org/apache/plc4x/java/test/TestType.java | 3 ++-
6 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 6646dac..45ae926 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -32,10 +32,15 @@ public class Plc4xSinkConnector extends SinkConnector {
static final String URL_CONFIG = "url";
private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
+ static final String QUERY_CONFIG = "query";
+ private static final String QUERY_DOC = "Field query to be sent to the PLC";
+
private static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
+ .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
+ .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
private String url;
+ private String query;
@Override
public Class<? extends Task> taskClass() {
@@ -46,6 +51,7 @@ public class Plc4xSinkConnector extends SinkConnector {
public List<Map<String, String>> taskConfigs(int maxTasks) {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put(URL_CONFIG, url);
+ taskConfig.put(QUERY_CONFIG, query);
// Only one task will be created; ignoring maxTasks for now
return Collections.singletonList(taskConfig);
@@ -54,6 +60,7 @@ public class Plc4xSinkConnector extends SinkConnector {
@Override
public void start(Map<String, String> props) {
url = props.get(URL_CONFIG);
+ query = props.get(QUERY_CONFIG);
}
@Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index a27b08a..b29418f 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -36,6 +36,7 @@ public class Plc4xSinkTask extends SinkTask {
private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
private String url;
+ private String query;
private PlcConnection plcConnection;
private PlcWriter plcWriter;
@@ -48,6 +49,7 @@ public class Plc4xSinkTask extends SinkTask {
@Override
public void start(Map<String, String> props) {
url = props.get(Plc4xSinkConnector.URL_CONFIG);
+ query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
openConnection();
@@ -63,7 +65,6 @@ public class Plc4xSinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record: records) {
- String query = record.key().toString();
String value = record.value().toString(); // TODO: implement other data types
PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
doWrite(plcRequest);
diff --git a/plc4j/protocols/pom.xml b/plc4j/protocols/pom.xml
index ffb7d3e..734c3e6 100644
--- a/plc4j/protocols/pom.xml
+++ b/plc4j/protocols/pom.xml
@@ -46,4 +46,4 @@
<module>benchmarks</module>
</modules>
-</project>
\ No newline at end of file
+</project>
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
index c046610..65cce8a 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -43,6 +43,8 @@ class TestDevice {
return Optional.ofNullable(state.get(field));
case RANDOM:
return Optional.of(randomValue(field.getDataType()));
+ case STDOUT:
+ return Optional.empty();
}
throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
}
@@ -52,9 +54,12 @@ class TestDevice {
switch(field.getType()) {
case STATE:
state.put(field, value);
+ return;
+ case STDOUT:
+ System.out.printf("TEST PLC: %s%n", value.getObject(0).toString());
+ return;
case RANDOM:
// Just ignore this ...
- return;
}
throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
}
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
index c8215dc..2488282 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
@@ -31,7 +31,9 @@ import java.util.regex.Pattern;
class TestField implements PlcField {
/**
- * Example: {@code RANDOM/foo:INTEGER}
+ * Examples:
+ * - {@code RANDOM/foo:INTEGER}
+ * - {@code STDOUT/foo:STRING}
*/
private static final Pattern ADDRESS_PATTERN = Pattern.compile("^(?<type>\\w+)/(?<name>\\w+):(?<dataType>.+)(\\[(?<numElements>\\d)])?$");
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
index 6483780..6654bb0 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
@@ -22,6 +22,7 @@ package org.apache.plc4x.java.test;
public enum TestType {
RANDOM,
- STATE
+ STATE,
+ STDOUT
}