You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/10 16:20:21 UTC

[incubator-plc4x] 07/08: implemented kafka sink connector

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 955ad7432c1e47e5850e516fde01f4a9476ee3b4
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 16:26:54 2018 +0200

    implemented kafka sink connector
---
 .../src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java | 9 ++++++++-
 .../src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java      | 3 ++-
 plc4j/protocols/pom.xml                                          | 2 +-
 .../src/main/java/org/apache/plc4x/java/test/TestDevice.java     | 7 ++++++-
 .../test/src/main/java/org/apache/plc4x/java/test/TestField.java | 4 +++-
 .../test/src/main/java/org/apache/plc4x/java/test/TestType.java  | 3 ++-
 6 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 6646dac..45ae926 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -32,10 +32,15 @@ public class Plc4xSinkConnector extends SinkConnector {
     static final String URL_CONFIG = "url";
     private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
+    static final String QUERY_CONFIG = "query";
+    private static final String QUERY_DOC = "Field query to be sent to the PLC";
+
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
-        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
+        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
+        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
 
     private String url;
+    private String query;
 
     @Override
     public Class<? extends Task> taskClass() {
@@ -46,6 +51,7 @@ public class Plc4xSinkConnector extends SinkConnector {
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         Map<String, String> taskConfig = new HashMap<>();
         taskConfig.put(URL_CONFIG, url);
+        taskConfig.put(QUERY_CONFIG, query);
 
         // Only one task will be created; ignoring maxTasks for now
         return Collections.singletonList(taskConfig);
@@ -54,6 +60,7 @@ public class Plc4xSinkConnector extends SinkConnector {
     @Override
     public void start(Map<String, String> props) {
         url = props.get(URL_CONFIG);
+        query = props.get(QUERY_CONFIG);
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index a27b08a..b29418f 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -36,6 +36,7 @@ public class Plc4xSinkTask extends SinkTask {
     private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
 
     private String url;
+    private String query;
 
     private PlcConnection plcConnection;
     private PlcWriter plcWriter;
@@ -48,6 +49,7 @@ public class Plc4xSinkTask extends SinkTask {
     @Override
     public void start(Map<String, String> props) {
         url = props.get(Plc4xSinkConnector.URL_CONFIG);
+        query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
 
         openConnection();
 
@@ -63,7 +65,6 @@ public class Plc4xSinkTask extends SinkTask {
     @Override
     public void put(Collection<SinkRecord> records) {
         for (SinkRecord record: records) {
-            String query = record.key().toString();
             String value = record.value().toString(); // TODO: implement other data types
             PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
             doWrite(plcRequest);
diff --git a/plc4j/protocols/pom.xml b/plc4j/protocols/pom.xml
index ffb7d3e..734c3e6 100644
--- a/plc4j/protocols/pom.xml
+++ b/plc4j/protocols/pom.xml
@@ -46,4 +46,4 @@
     <module>benchmarks</module>
   </modules>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
index c046610..65cce8a 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -43,6 +43,8 @@ class TestDevice {
                 return Optional.ofNullable(state.get(field));
             case RANDOM:
                 return Optional.of(randomValue(field.getDataType()));
+            case STDOUT:
+                return Optional.empty();
         }
         throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
     }
@@ -52,9 +54,12 @@ class TestDevice {
         switch(field.getType()) {
             case STATE:
                 state.put(field, value);
+                return;
+            case STDOUT:
+                System.out.printf("TEST PLC: %s%n", value.getObject(0).toString());
+                return;
             case RANDOM:
                 // Just ignore this ...
-                return;
         }
         throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
     }
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
index c8215dc..2488282 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
@@ -31,7 +31,9 @@ import java.util.regex.Pattern;
 class TestField implements PlcField {
 
     /**
-     * Example: {@code RANDOM/foo:INTEGER}
+     * Examples:
+     *  - {@code RANDOM/foo:INTEGER}
+     *  - {@code STDOUT/foo:STRING}
      */
     private static final Pattern ADDRESS_PATTERN = Pattern.compile("^(?<type>\\w+)/(?<name>\\w+):(?<dataType>.+)(\\[(?<numElements>\\d)])?$");
 
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
index 6483780..6654bb0 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
@@ -22,6 +22,7 @@ package org.apache.plc4x.java.test;
 public enum TestType {
 
     RANDOM,
-    STATE
+    STATE,
+    STDOUT
 
 }