You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/06 21:14:54 UTC
[incubator-plc4x] 03/05: added configuration options
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch feature/apache-kafka
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit b430e28a55684762e8ea93212c210677f6cd2448
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Thu Sep 6 11:59:19 2018 +0200
added configuration options
---
.../org/apache/plc4x/kafka/common/Plc4xConfig.java | 26 ++++++++++++-
.../apache/plc4x/kafka/source/Plc4xSourceTask.java | 44 ++++++++++++++++------
2 files changed, 57 insertions(+), 13 deletions(-)
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
index bceedfc..b906fab 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
@@ -33,6 +33,15 @@ public class Plc4xConfig extends AbstractConfig {
public static final String PLC_CONNECTION_STRING_DISPLAY = "PLC Connection String";
public static final String PLC_CONNECTION_STRING_DOC = "Connection string used by PLC4X to connect to the PLC.";
+ public static final String PLC_TOPIC = "topic";
+ public static final String PLC_TOPIC_DOC = "Kafka topic to publish messages to.";
+
+ public static final String PLC_DATATYPE_CONFIG = "type";
+ public static final String PLC_DATATYPE_DOC = "Data type of values sent or received by PLC.";
+
+ public static final String PLC_ADDRESS = "address";
+ public static final String PLC_ADDRESS_DOC = "PLC address to sent to or receive data from.";
+
public static ConfigDef baseConfigDef() {
ConfigDef config = new ConfigDef();
addPlcOptions(config);
@@ -44,7 +53,22 @@ public class Plc4xConfig extends AbstractConfig {
PLC_CONNECTION_STRING_CONFIG,
Type.STRING,
Importance.HIGH,
- PLC_CONNECTION_STRING_DOC);
+ PLC_CONNECTION_STRING_DOC)
+ .define(
+ PLC_DATATYPE_CONFIG,
+ Type.CLASS,
+ Importance.HIGH,
+ PLC_DATATYPE_DOC)
+ .define(
+ PLC_TOPIC,
+ Type.STRING,
+ Importance.HIGH,
+ PLC_TOPIC_DOC)
+ .define(
+ PLC_ADDRESS,
+ Type.STRING,
+ Importance.HIGH,
+ PLC_ADDRESS_DOC);
}
public static final ConfigDef CONFIG_DEF = baseConfigDef();
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
index 36e37a0..040df2f 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -19,8 +19,7 @@ under the License.
package org.apache.plc4x.kafka.source;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -28,6 +27,7 @@ import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcInvalidAddressException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
@@ -36,11 +36,7 @@ import org.apache.plc4x.kafka.util.VersionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,6 +49,11 @@ public class Plc4xSourceTask extends SourceTask {
private PlcReader reader;
private PlcReadRequest readRequest;
private AtomicBoolean running = new AtomicBoolean(false);
+ private String topic;
+ private Schema keySchema = Schema.STRING_SCHEMA;
+ private Schema valueSchema;
+ private long offset = 0;
+ private final Map<Class<?>, Schema> typeSchemas = initTypeSchemas();
@Override
public String version() {
@@ -64,7 +65,7 @@ public class Plc4xSourceTask extends SourceTask {
try {
config = new Plc4xSourceConfig(properties);
} catch (ConfigException e) {
- throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
+ throw new ConnectException("Couldn't start Plc4xSourceTask due to configuration error", e);
}
final String url = config.getString(Plc4xSourceConfig.PLC_CONNECTION_STRING_CONFIG);
@@ -75,12 +76,28 @@ public class Plc4xSourceTask extends SourceTask {
throw new ConnectException("PlcReader not available for this type of connection");
}
reader = readerOptional.get();
+ Class<?> dataType = config.getClass(Plc4xSourceConfig.PLC_DATATYPE_CONFIG);
+ String addressString = config.getString(Plc4xSourceConfig.PLC_ADDRESS);
+ Address address = plcConnection.parseAddress(addressString);
+ readRequest = new PlcReadRequest(dataType, address);
+ topic = config.getString(Plc4xSourceConfig.PLC_TOPIC);
+ valueSchema = typeSchemas.get(dataType);
running.set(true);
} catch (PlcConnectionException e) {
throw new ConnectException("Caught exception while connecting to PLC", e);
+ } catch (PlcInvalidAddressException e) {
+ throw new ConnectException("Invalid PLC address", e);
}
}
+ private Map<Class<?>, Schema> initTypeSchemas() {
+ Map<Class<?>, Schema> map = new HashMap<>();
+ map.put(Boolean.class, Schema.BOOLEAN_SCHEMA);
+ map.put(Integer.class, Schema.INT32_SCHEMA);
+ // TODO add other
+ return map;
+ }
+
@Override
public void stop() {
if(plcConnection != null) {
@@ -103,10 +120,13 @@ public class Plc4xSourceTask extends SourceTask {
for (ReadResponseItem<?> responseItem : plcReadResponse.getResponseItems()) {
Address address = responseItem.getRequestItem().getAddress();
- List<?> values = responseItem.getValues();
-
- // TODO: Implement Sending this information to Kafka ...
- //results.add(new SourceRecord())
+ for (Object value : responseItem.getValues()) {
+ Map<String, String> sourcePartition = Collections.singletonMap("address", address.toString());
+ Map<String, Long> sourceOffset = Collections.singletonMap("offset", offset);
+ SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, keySchema, address.toString(), valueSchema, value);
+ results.add(record);
+ offset++; // TODO: figure out how to track offsets
+ }
}
} catch (ExecutionException e) {
log.error("Error reading values from PLC", e);