You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/10 16:20:16 UTC

[incubator-plc4x] 02/08: implemented throttling

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 1b0ca49c45bb91cebab09d820003ad8f89c4b040
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 11:50:00 2018 +0200

    implemented throttling
---
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   | 10 ++-
 .../apache/plc4x/kafka/source/Plc4xSourceTask.java | 81 ++++++++++++++--------
 2 files changed, 62 insertions(+), 29 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4885771..cc91267 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -39,14 +39,20 @@ public class Plc4xSourceConnector extends SourceConnector {
     public static final String QUERY_CONFIG = "query";
     private static final String QUERY_DOC = "Field query to be sent to the PLC";
 
+    public static final String RATE_CONFIG = "rate";
+    private static final Integer RATE_DEFAULT = 1000;
+    private static final String RATE_DOC = "Polling rate";
+
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
         .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
-        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
+        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC)
+        .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
 
     private String topic;
     private String url;
     private String query;
+    private Integer rate;
 
     @Override
     public Class<? extends Task> taskClass() {
@@ -59,6 +65,7 @@ public class Plc4xSourceConnector extends SourceConnector {
         taskConfig.put(TOPIC_CONFIG, topic);
         taskConfig.put(URL_CONFIG, url);
         taskConfig.put(QUERY_CONFIG, query);
+        taskConfig.put(RATE_CONFIG, rate.toString());
 
         // Only one task will be created; ignoring maxTasks for now
         return Collections.singletonList(taskConfig);
@@ -69,6 +76,7 @@ public class Plc4xSourceConnector extends SourceConnector {
         topic = props.get(TOPIC_CONFIG);
         url = props.get(URL_CONFIG);
         query = props.get(QUERY_CONFIG);
+        rate = Integer.valueOf(props.get(RATE_CONFIG));
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
index 9c3014b..7c373fe 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -36,7 +36,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.*;
 
 public class Plc4xSourceTask extends SourceTask {
     private final static String FIELD_KEY = "key";
@@ -44,12 +44,17 @@ public class Plc4xSourceTask extends SourceTask {
     private String topic;
     private String url;
     private String query;
+    private Integer rate;
 
     private volatile boolean running = false;
     private PlcConnection plcConnection;
     private PlcReader plcReader;
     private PlcReadRequest plcRequest;
 
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+    private ScheduledFuture<?> timer;
+    private boolean fetch = true;
+
     @Override
     public String version() {
         return VersionUtil.getVersion();
@@ -60,6 +65,7 @@ public class Plc4xSourceTask extends SourceTask {
         topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
         url = props.get(Plc4xSourceConnector.URL_CONFIG);
         query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
+        rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
 
         try {
             plcConnection = new PlcDriverManager().getConnection(url);
@@ -73,12 +79,20 @@ public class Plc4xSourceTask extends SourceTask {
 
         plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, query).build();
 
+        timer = scheduler.scheduleAtFixedRate(() -> {
+            synchronized (Plc4xSourceTask.this) {
+                Plc4xSourceTask.this.fetch = true;
+                notify();
+            }
+        }, 0, rate, TimeUnit.MILLISECONDS);
+
         running = true;
     }
 
     @Override
     public void stop() {
         running = false;
+        timer.cancel(true);
         if (plcConnection != null) {
             try {
                 plcConnection.close();
@@ -93,33 +107,44 @@ public class Plc4xSourceTask extends SourceTask {
         if (!running)
             return null;
 
-        List<SourceRecord> result = new LinkedList<>();
-        try {
-            PlcReadResponse<?> response = plcReader.read(plcRequest).get();
-            if (response.getResponseCode(FIELD_KEY).equals(PlcResponseCode.OK)) {
-                Object rawValue = response.getObject(FIELD_KEY);
-                Schema valueSchema = getSchema(rawValue.getClass());
-                Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
-                Long timestamp = System.currentTimeMillis();
-                Map<String, String> sourcePartition = Collections.singletonMap("url", url);
-                Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
-                SourceRecord record =
-                    new SourceRecord(
-                        sourcePartition,
-                        sourceOffset,
-                        topic,
-                        Schema.STRING_SCHEMA,
-                        query,
-                        valueSchema,
-                        value
-                    );
-
-                result.add(record);
+        synchronized (this) {
+            while (!fetch) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    // continue
+                }
+            }
+            List<SourceRecord> result = new LinkedList<>();
+            try {
+                PlcReadResponse<?> response = plcReader.read(plcRequest).get();
+                if (response.getResponseCode(FIELD_KEY).equals(PlcResponseCode.OK)) {
+                    Object rawValue = response.getObject(FIELD_KEY);
+                    Schema valueSchema = getSchema(rawValue.getClass());
+                    Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+                    Long timestamp = System.currentTimeMillis();
+                    Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+                    Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+                    SourceRecord record =
+                        new SourceRecord(
+                            sourcePartition,
+                            sourceOffset,
+                            topic,
+                            Schema.STRING_SCHEMA,
+                            query,
+                            valueSchema,
+                            value
+                        );
+
+                    result.add(record);
+                }
+                return result;
+            } catch (InterruptedException | ExecutionException e) {
+                return null;
+            } finally {
+                fetch = false;
             }
-            return result;
-        } catch (InterruptedException | ExecutionException e) {
-            return null;
         }
     }
 
@@ -127,7 +152,7 @@ public class Plc4xSourceTask extends SourceTask {
         if (type.equals(Integer.class))
             return Schema.INT32_SCHEMA;
 
-        return Schema.STRING_SCHEMA;
+        return Schema.STRING_SCHEMA; // default schema
     }
 
 }
\ No newline at end of file