You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/17 07:07:48 UTC

[incubator-plc4x] 04/29: implemented support for multiple workers in kafka sink connector

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit adf6b6735317b5aa306b06f461e09806ddd54769
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Tue Sep 11 14:23:32 2018 +0200

    implemented support for multiple workers in kafka sink connector
---
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java | 25 ++++++++--------------
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 23 ++++++++++++++++----
 .../org/apache/plc4x/java/test/TestDevice.java     |  7 +++---
 .../apache/plc4x/java/test/TestFieldHandler.java   |  7 +++---
 4 files changed, 36 insertions(+), 26 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 1899208..fa2e32d 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -24,21 +24,14 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class Plc4xSinkConnector extends SinkConnector {
     static final String URL_CONFIG = "url";
     private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
-    static final String QUERY_CONFIG = "query";
-    private static final String QUERY_DOC = "Field query to be sent to the PLC";
-
     static final ConfigDef CONFIG_DEF = new ConfigDef()
-        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
-        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
+        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
 
     private String url;
     private String query;
@@ -50,19 +43,19 @@ public class Plc4xSinkConnector extends SinkConnector {
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        Map<String, String> taskConfig = new HashMap<>();
-        taskConfig.put(URL_CONFIG, url);
-        taskConfig.put(QUERY_CONFIG, query);
-
-        // Only one task will be created; ignoring maxTasks for now
-        return Collections.singletonList(taskConfig);
+        List<Map<String, String>> configs = new LinkedList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            Map<String, String> taskConfig = new HashMap<>();
+            taskConfig.put(URL_CONFIG, url);
+            configs.add(taskConfig);
+        }
+        return configs;
     }
 
     @Override
     public void start(Map<String, String> props) {
         AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
         url = config.getString(URL_CONFIG);
-        query = config.getString(QUERY_CONFIG);
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index a54d5b0..648a32e 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -35,7 +35,6 @@ import java.util.concurrent.ExecutionException;
 
 public class Plc4xSinkTask extends SinkTask {
     private String url;
-    private String query;
 
     private PlcConnection plcConnection;
     private PlcWriter plcWriter;
@@ -49,7 +48,6 @@ public class Plc4xSinkTask extends SinkTask {
     public void start(Map<String, String> props) {
         AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
         url = config.getString(Plc4xSinkConnector.URL_CONFIG);
-        query = config.getString(Plc4xSinkConnector.QUERY_CONFIG);
 
         openConnection();
 
@@ -65,12 +63,29 @@ public class Plc4xSinkTask extends SinkTask {
     @Override
     public void put(Collection<SinkRecord> records) {
         for (SinkRecord record: records) {
-            String value = record.value().toString(); // TODO: implement other data types
-            PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(query, query, value).build();
+            String query = record.key().toString();
+            Object value = record.value();
+            PlcWriteRequest.Builder builder = plcWriter.writeRequestBuilder();
+            PlcWriteRequest plcRequest = addToBuilder(builder, query, value).build();
             doWrite(plcRequest);
         }
     }
 
+    // TODO: fix this
+    private PlcWriteRequest.Builder addToBuilder(PlcWriteRequest.Builder builder, String query, Object obj) {
+        Class<?> type = obj.getClass();
+
+        if (type.equals(Integer.class)) {
+            int value = (int) obj;
+            builder.addItem(query, query, value);
+        } else if (type.equals(String.class)) {
+            String value = (String) obj;
+            builder.addItem(query, query, value);
+        }
+
+        return builder;
+    }
+
     private void openConnection() {
         try {
             plcConnection = new PlcDriverManager().getConnection(url);
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
index 65cce8a..3966deb 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -51,15 +51,16 @@ class TestDevice {
 
     void set(TestField field, FieldItem value) {
         Objects.requireNonNull(field);
-        switch(field.getType()) {
+        switch (field.getType()) {
             case STATE:
                 state.put(field, value);
                 return;
             case STDOUT:
-                System.out.printf("TEST PLC: %s%n", value.getObject(0).toString());
+                System.out.printf("TEST PLC STDOUT [%s]: %s%n", field.getName(), Objects.toString(value.getValues()[0]));
                 return;
             case RANDOM:
-                // Just ignore this ...
+                System.out.printf("TEST PLC RANDOM [%s]: %s%n", field.getName(), Objects.toString(value.getValues()[0]));
+                return;
         }
         throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
     }
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java
index 7c9f286..5953da6 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestFieldHandler.java
@@ -28,6 +28,7 @@ import org.apache.plc4x.java.base.messages.items.*;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.Arrays;
 
 public class TestFieldHandler implements PlcFieldHandler {
 
@@ -52,7 +53,7 @@ public class TestFieldHandler implements PlcFieldHandler {
     public FieldItem encodeByte(PlcField field, Object[] values) {
         TestField testField = (TestField) field;
         if(testField.getDataType() == Byte.class) {
-            return new DefaultIntegerFieldItem((Long[]) values);
+            return new DefaultIntegerFieldItem(Arrays.stream(values).map(x -> new Long((Byte) x)).toArray(Long[]::new));
         }
         throw new PlcRuntimeException("Invalid encoder for type " + testField.getDataType().getName());
     }
@@ -61,7 +62,7 @@ public class TestFieldHandler implements PlcFieldHandler {
     public FieldItem encodeShort(PlcField field, Object[] values) {
         TestField testField = (TestField) field;
         if(testField.getDataType() == Short.class) {
-            return new DefaultIntegerFieldItem((Long[]) values);
+            return new DefaultIntegerFieldItem(Arrays.stream(values).map(x -> new Long((Short) x)).toArray(Long[]::new));
         }
         throw new PlcRuntimeException("Invalid encoder for type " + testField.getDataType().getName());
     }
@@ -70,7 +71,7 @@ public class TestFieldHandler implements PlcFieldHandler {
     public FieldItem encodeInteger(PlcField field, Object[] values) {
         TestField testField = (TestField) field;
         if(testField.getDataType() == Integer.class) {
-            return new DefaultIntegerFieldItem((Long[]) values);
+            return new DefaultIntegerFieldItem(Arrays.stream(values).map(x -> new Long((Integer) x)).toArray(Long[]::new));
         }
         throw new PlcRuntimeException("Invalid encoder for type " + testField.getDataType().getName());
     }