You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/10 16:20:16 UTC
[incubator-plc4x] 02/08: implemented throttling
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 1b0ca49c45bb91cebab09d820003ad8f89c4b040
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 11:50:00 2018 +0200
implemented throttling
---
.../apache/plc4x/kafka/Plc4xSourceConnector.java | 10 ++-
.../apache/plc4x/kafka/source/Plc4xSourceTask.java | 81 ++++++++++++++--------
2 files changed, 62 insertions(+), 29 deletions(-)
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4885771..cc91267 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -39,14 +39,20 @@ public class Plc4xSourceConnector extends SourceConnector {
public static final String QUERY_CONFIG = "query";
private static final String QUERY_DOC = "Field query to be sent to the PLC";
+ public static final String RATE_CONFIG = "rate";
+ private static final Integer RATE_DEFAULT = 1000;
+ private static final String RATE_DOC = "Polling rate";
+
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
- .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
+ .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC)
+ .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
private String topic;
private String url;
private String query;
+ private Integer rate;
@Override
public Class<? extends Task> taskClass() {
@@ -59,6 +65,7 @@ public class Plc4xSourceConnector extends SourceConnector {
taskConfig.put(TOPIC_CONFIG, topic);
taskConfig.put(URL_CONFIG, url);
taskConfig.put(QUERY_CONFIG, query);
+ taskConfig.put(RATE_CONFIG, rate.toString());
// Only one task will be created; ignoring maxTasks for now
return Collections.singletonList(taskConfig);
@@ -69,6 +76,7 @@ public class Plc4xSourceConnector extends SourceConnector {
topic = props.get(TOPIC_CONFIG);
url = props.get(URL_CONFIG);
query = props.get(QUERY_CONFIG);
+ rate = Integer.valueOf(props.get(RATE_CONFIG));
}
@Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
index 9c3014b..7c373fe 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -36,7 +36,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.*;
public class Plc4xSourceTask extends SourceTask {
private final static String FIELD_KEY = "key";
@@ -44,12 +44,17 @@ public class Plc4xSourceTask extends SourceTask {
private String topic;
private String url;
private String query;
+ private Integer rate;
private volatile boolean running = false;
private PlcConnection plcConnection;
private PlcReader plcReader;
private PlcReadRequest plcRequest;
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ private ScheduledFuture<?> timer;
+ private boolean fetch = true;
+
@Override
public String version() {
return VersionUtil.getVersion();
@@ -60,6 +65,7 @@ public class Plc4xSourceTask extends SourceTask {
topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
url = props.get(Plc4xSourceConnector.URL_CONFIG);
query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
+ rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
try {
plcConnection = new PlcDriverManager().getConnection(url);
@@ -73,12 +79,20 @@ public class Plc4xSourceTask extends SourceTask {
plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, query).build();
+ timer = scheduler.scheduleAtFixedRate(() -> {
+ synchronized (Plc4xSourceTask.this) {
+ Plc4xSourceTask.this.fetch = true;
+ notify();
+ }
+ }, 0, rate, TimeUnit.MILLISECONDS);
+
running = true;
}
@Override
public void stop() {
running = false;
+ timer.cancel(true);
if (plcConnection != null) {
try {
plcConnection.close();
@@ -93,33 +107,44 @@ public class Plc4xSourceTask extends SourceTask {
if (!running)
return null;
- List<SourceRecord> result = new LinkedList<>();
- try {
- PlcReadResponse<?> response = plcReader.read(plcRequest).get();
- if (response.getResponseCode(FIELD_KEY).equals(PlcResponseCode.OK)) {
- Object rawValue = response.getObject(FIELD_KEY);
- Schema valueSchema = getSchema(rawValue.getClass());
- Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
- Long timestamp = System.currentTimeMillis();
- Map<String, String> sourcePartition = Collections.singletonMap("url", url);
- Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
- SourceRecord record =
- new SourceRecord(
- sourcePartition,
- sourceOffset,
- topic,
- Schema.STRING_SCHEMA,
- query,
- valueSchema,
- value
- );
-
- result.add(record);
+ synchronized (this) {
+ while (!fetch) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ List<SourceRecord> result = new LinkedList<>();
+ try {
+ PlcReadResponse<?> response = plcReader.read(plcRequest).get();
+ if (response.getResponseCode(FIELD_KEY).equals(PlcResponseCode.OK)) {
+ Object rawValue = response.getObject(FIELD_KEY);
+ Schema valueSchema = getSchema(rawValue.getClass());
+ Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+ Long timestamp = System.currentTimeMillis();
+ Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+ Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+ SourceRecord record =
+ new SourceRecord(
+ sourcePartition,
+ sourceOffset,
+ topic,
+ Schema.STRING_SCHEMA,
+ query,
+ valueSchema,
+ value
+ );
+
+ result.add(record);
+ }
+ return result;
+ } catch (InterruptedException | ExecutionException e) {
+ return null;
+ } finally {
+ fetch = false;
}
- return result;
- } catch (InterruptedException | ExecutionException e) {
- return null;
}
}
@@ -127,7 +152,7 @@ public class Plc4xSourceTask extends SourceTask {
if (type.equals(Integer.class))
return Schema.INT32_SCHEMA;
- return Schema.STRING_SCHEMA;
+ return Schema.STRING_SCHEMA; // default schema
}
}
\ No newline at end of file