You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/14 17:08:19 UTC
[plc4x] 04/04: - Continued working on the scraper based Kafka
Connect plugin
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 7c0303fc2dc37c6f8c0bffc4d74ec76d42d1cf49
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Wed Aug 14 19:08:01 2019 +0200
- Continued working on the scraper based Kafka Connect plugin
---
plc4cpp/pom.xml | 7 +-
plc4j/integrations/apache-kafka/README.md | 8 +-
.../apache-kafka/config/sink.properties | 23 --
plc4j/integrations/apache-kafka/pom.xml | 77 ++++++
.../apache/plc4x/kafka/Plc4xSourceConnector.java | 91 +++----
.../org/apache/plc4x/kafka/Plc4xSourceTask.java | 288 ++++++++++-----------
.../apache/plc4x/kafka/config/SourceConfig.java | 14 +
.../JobConfigurationTriggeredImplBuilder.java | 2 +-
plc4py/src/main/python/__init__.py | 9 -
9 files changed, 278 insertions(+), 241 deletions(-)
diff --git a/plc4cpp/pom.xml b/plc4cpp/pom.xml
index a1f0eb5..3b1796d 100644
--- a/plc4cpp/pom.xml
+++ b/plc4cpp/pom.xml
@@ -33,12 +33,9 @@
<name>PLC4Cpp</name>
<description>Implementation of the protocol adapters for usage as C++ library.</description>
- <!-- Disabled for now as C++ support is currently not installed in Apache SonarQube -->
- <!--properties>
- <sonar.language>c++</sonar.language>
- </properties-->
-
<properties>
+ <!-- Tell Sonar where to find the c++ sources -->
+ <sonar.sources>api/src/main/cpp,drivers/proxy/src/main/cpp,drivers/s7/src/main/cpp,protocols/driver-bases/base/src/main/cpp,protocols/s7/src/main/cpp,utils/logger/src/main/cpp,utils/systemconfig/src/main/cpp</sonar.sources>
<option.with-proxies>OFF</option.with-proxies>
</properties>
diff --git a/plc4j/integrations/apache-kafka/README.md b/plc4j/integrations/apache-kafka/README.md
index a2a073e..2c29fcb 100644
--- a/plc4j/integrations/apache-kafka/README.md
+++ b/plc4j/integrations/apache-kafka/README.md
@@ -19,16 +19,12 @@
# Kafka Connect PLC4X Connector
-The PLC4X Connector streams data from and to any device accessible through the PLC4X interface.
+The PLC4X Connector streams data from any device accessible through the PLC4X interface.
## Source Connector
See `config/source.properties` for example configuration.
-## Sink Connector
-
-See `config/sink.properties` for example configuration.
-
## Quickstart
A Kafka Connect worker can be run in two modes:
@@ -42,7 +38,7 @@ In order to start a Kafka Connect system the following steps have to be performe
1) Download the latest version of Apache Kafka binaries from here: https://kafka.apache.org/downloads
2) Unpack the archive.
-3) Copy the target/apache-kafka-0.5.0-SNAPSHOT.jar to the Kafka "libs" directory.
+3) Copy the target/apache-kafka-0.5.0-SNAPSHOT-jar-with-dependencies.jar to the Kafka "libs" directory.
4) Copy the files in the "config" to Kafka's "configs" directory (maybe inside a "plc4x" subdirectory)
### Start a Kafka Broker
diff --git a/plc4j/integrations/apache-kafka/config/sink.properties b/plc4j/integrations/apache-kafka/config/sink.properties
deleted file mode 100644
index 8da6eae..0000000
--- a/plc4j/integrations/apache-kafka/config/sink.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-name=plc-sink-test
-connector.class=org.apache.plc4x.kafka.Plc4xSinkConnector
-topics=test
-url=test:unused
-transforms=key
-transforms.key.type=org.apache.kafka.connect.transforms.ExtractField$Key
-transforms.key.field=query
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/pom.xml b/plc4j/integrations/apache-kafka/pom.xml
index 857f412..6dfc05e 100644
--- a/plc4j/integrations/apache-kafka/pom.xml
+++ b/plc4j/integrations/apache-kafka/pom.xml
@@ -35,6 +35,45 @@
<kafka.version>2.3.0</kafka.version>
</properties>
+ <!-- Build a fat jar so the user only needs to drop in this one jar -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>assemble-all</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <usedDependencies combine.children="append">
+ <usedDependency>org.apache.plc4x:plc4j-scraper</usedDependency>
+ <usedDependency>org.apache.plc4x:plc4j-driver-ads</usedDependency>
+ <usedDependency>org.apache.plc4x:plc4j-driver-ethernet-ip</usedDependency>
+ <usedDependency>org.apache.plc4x:plc4j-driver-modbus</usedDependency>
+ <usedDependency>org.apache.plc4x:plc4j-driver-opcua</usedDependency>
+ <usedDependency>org.apache.plc4x:plc4j-driver-s7</usedDependency>
+ <usedDependency>org.apache.plc4x:plc4j-driver-simulated</usedDependency>
+ </usedDependencies>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
<dependencies>
<dependency>
<groupId>org.apache.plc4x</groupId>
@@ -64,6 +103,44 @@
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
+
+ <!-- Include all drivers -->
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-driver-ads</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-driver-ethernet-ip</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-driver-modbus</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-driver-opcua</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-driver-s7</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-driver-simulated</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index ffb6b35..858bd3d 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -21,6 +21,9 @@ package org.apache.plc4x.kafka;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.plc4x.kafka.config.Job;
+import org.apache.plc4x.kafka.config.JobReference;
+import org.apache.plc4x.kafka.config.Source;
import org.apache.plc4x.kafka.config.SourceConfig;
import org.apache.plc4x.kafka.exceptions.ConfigurationException;
import org.apache.plc4x.kafka.util.VersionUtil;
@@ -42,11 +45,14 @@ public class Plc4xSourceConnector extends SourceConnector {
try {
sourceConfig = SourceConfig.fromPropertyMap(props);
} catch (ConfigurationException e) {
+ log.error("Error processing source configuration", e);
}
}
@Override
- public void stop() {}
+ public void stop() {
+ sourceConfig = null;
+ }
@Override
public Class<? extends Task> taskClass() {
@@ -54,59 +60,46 @@ public class Plc4xSourceConnector extends SourceConnector {
}
@Override
- @SuppressWarnings("unchecked")
public List<Map<String, String>> taskConfigs(int maxTasks) {
+ // Initially we planned to have the simple assumption that one task maps to one PLC connection.
+ // But we could easily say that one scraper instance maps to a task and one scraper task can
+ // process multiple PLC connections. But I guess this would be an optimization as we have to
+ // balance the load manually.
+ if(sourceConfig.getJobs().size() > maxTasks) {
+ // not enough tasks
+ log.warn("NOT ENOUGH TASKS!");
+ return Collections.emptyList();
+ }
+
+ // For each configured source we'll start a dedicated scraper instance collecting
+ // all the scraper jobs enabled for this source.
List<Map<String, String>> configs = new LinkedList<>();
- /*if (json.isEmpty()) {
- Map<String, List<String>> groupedByHost = new HashMap<>();
- queries.stream().map(query -> query.split("#", 2)).collect(Collectors.groupingBy(parts -> parts[0])).forEach((host, q) ->
- groupedByHost.put(host, q.stream().map(parts -> parts[1]).collect(Collectors.toList())));
- if (groupedByHost.size() > maxTasks) {
- // Not enough tasks
- // TODO: throw exception?
- return Collections.emptyList();
- }
- groupedByHost.forEach((host, qs) -> {
- Map<String, String> taskConfig = new HashMap<>();
- taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic);
- taskConfig.put(Plc4xSourceTask.URL_CONFIG, host);
- taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", qs));
- taskConfig.put(Plc4xSourceTask.RATE_CONFIG, rate.toString());
- configs.add(taskConfig);
- });
- } else {
- try {
- // TODO
- String config = new Scanner(new URL(json).openStream(), StandardCharsets.UTF_8.name()).useDelimiter("\\A").next();
- ObjectMapper mapper = new ObjectMapper();
- Map<String, Object> values = mapper.readValue(config, new TypeReference<Map<String, Object>>() {});
- List<Map<String, Object>> plcs = (List<Map<String, Object>>) values.get("PLCs");
- log.info("TASKS REQUIRED: " + plcs.size());
- if (plcs.size() > maxTasks) {
- // not enough tasks
- log.warn("NOT ENOUGH TASKS!");
- return Collections.emptyList();
- }
- for (Map<String, Object> plc : plcs) {
- Map<String, String> taskConfig = new HashMap<>();
- String ip = plc.get("IP").toString();
- String topic = ip;
- String url = "s7://" + ip + "/1/" + plc.get("Slot");
- List<String> queries = new LinkedList<>();
- for (Map<String, Object> operand : (List<Map<String, Object>>)plc.get("operands")) {
- String query = "%" + operand.get("Operand") + ":" + operand.get("Datatype");
- queries.add(query);
+ for (Source source : sourceConfig.getSources()) {
+ // Build a list of job configurations only containing the ones referenced from
+ // the current source.
+ StringBuilder query = new StringBuilder();
+ for (JobReference jobReference : source.getJobReferences()) {
+ Job job = sourceConfig.getJob(jobReference.getName());
+ if(job == null) {
+ log.warn(String.format("Couldn't find referenced job '%s'", jobReference.getName()));
+ } else if(jobReference.isEnabled()) {
+ query.append(",").append(jobReference.getName()).append(":").append(jobReference.getTopic());
+ query.append(":").append(job.getInterval());
+ for (Map.Entry<String, String> field : job.getFields().entrySet()) {
+ String fieldName = field.getKey();
+ String fieldAddress = field.getValue();
+ query.append(":").append(fieldName).append("#").append(fieldAddress);
}
- taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic);
- taskConfig.put(Plc4xSourceTask.URL_CONFIG, url);
- taskConfig.put(RATE_CONFIG, rate.toString());
- taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", queries));
- configs.add(taskConfig);
}
- } catch (IOException e) {
- log.error("ERROR CONFIGURING TASK", e);
}
- }*/
+
+ // Create a new task configuration.
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(Plc4xSourceTask.CONNECTION_NAME_CONFIG, source.getName());
+ taskConfig.put(Plc4xSourceTask.PLC4X_CONNECTION_STRING_CONFIG, source.getConnectionString());
+ taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, query.toString().substring(1));
+ configs.add(taskConfig);
+ }
return configs;
}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index dfc1c04..dc169fe 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -27,13 +27,18 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.scraper.ResultHandler;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
import org.apache.plc4x.kafka.util.VersionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
@@ -46,46 +51,48 @@ import java.util.concurrent.*;
*/
public class Plc4xSourceTask extends SourceTask {
- static final String TOPIC_CONFIG = "topic";
- private static final String TOPIC_DOC = "Kafka topic to publish to";
+ private static final Logger log = LoggerFactory.getLogger(Plc4xSourceTask.class);
- static final String URL_CONFIG = "url";
- private static final String URL_DOC = "PLC URL";
+ /*
+ * Config of the task.
+ */
+ static final String CONNECTION_NAME_CONFIG = "connection-name";
+ private static final String CONNECTION_NAME_STRING_DOC = "Connection Name";
+
+ static final String PLC4X_CONNECTION_STRING_CONFIG = "plc4x-connection-string";
+ private static final String PLC4X_CONNECTION_STRING_DOC = "PLC4X Connection String";
+ // Syntax for the queries: {job-name}:{topic}:{rate}:{field-alias}#{field-address}:{field-alias}#{field-address}...,{topic}:{rate}:....
static final String QUERIES_CONFIG = "queries";
private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
- static final String RATE_CONFIG = "rate";
- private static final Integer RATE_DEFAULT = 1000;
- private static final String RATE_DOC = "Polling rate";
-
private static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
- .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
- .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
- .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
+ .define(CONNECTION_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, CONNECTION_NAME_STRING_DOC)
+ .define(PLC4X_CONNECTION_STRING_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, PLC4X_CONNECTION_STRING_DOC)
+ .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC);
- private static final long WAIT_LIMIT_MILLIS = 100;
- private static final long TIMEOUT_LIMIT_MILLIS = 5000;
-
- private static final String URL_FIELD = "url";
- private static final String QUERY_FIELD = "query";
+ /*
+ * Configuration of the output.
+ */
+ private static final String SOURCE_NAME_FIELD = "source-name";
+ private static final String JOB_NAME_FIELD = "job-name";
+ private static final String FIELD_NAME_FIELD = "field-name";
private static final Schema KEY_SCHEMA =
new SchemaBuilder(Schema.Type.STRUCT)
- .field(URL_FIELD, Schema.STRING_SCHEMA)
- .field(QUERY_FIELD, Schema.STRING_SCHEMA)
+ .field(SOURCE_NAME_FIELD, Schema.STRING_SCHEMA)
+ .field(JOB_NAME_FIELD, Schema.STRING_SCHEMA)
+ .field(FIELD_NAME_FIELD, Schema.STRING_SCHEMA)
.build();
- private String topic;
- private String url;
- private List<String> queries;
-
- private PlcConnection plcConnection;
+ // Internal properties.
+ private Map<String, String> topics;
+ private PlcDriverManager plcDriverManager;
+ private TriggerCollector triggerCollector;
+ private TriggeredScraperImpl scraper;
- // TODO: should we use shared (static) thread pool for this?
- private ScheduledExecutorService scheduler;
- private boolean fetch = true;
+ // Internal buffer into which all incoming scraper responses are written to.
+ private ArrayBlockingQueue<SourceRecord> buffer;
@Override
public String version() {
@@ -95,134 +102,119 @@ public class Plc4xSourceTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
- topic = config.getString(TOPIC_CONFIG);
- url = config.getString(URL_CONFIG);
- queries = config.getList(QUERIES_CONFIG);
-
- openConnection();
-
- if (!plcConnection.getMetadata().canRead()) {
- throw new ConnectException("Reading not supported on this connection");
- }
-
- int rate = Integer.parseInt(props.get(RATE_CONFIG));
- scheduler = Executors.newScheduledThreadPool(1);
- scheduler.scheduleAtFixedRate(Plc4xSourceTask.this::scheduleFetch, rate, rate, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void stop() {
- scheduler.shutdown();
- closeConnection();
- synchronized (this) {
- notify(); // wake up thread waiting in awaitFetch
- }
- }
-
- @Override
- public List<SourceRecord> poll() throws InterruptedException {
- return awaitFetch(WAIT_LIMIT_MILLIS) ? doFetch() : null;
- }
-
- private void openConnection() {
- try {
- plcConnection = new PlcDriverManager().getConnection(url);
- plcConnection.connect();
- } catch (PlcConnectionException e) {
- throw new ConnectException("Could not establish a PLC connection", e);
- }
- }
+ String connectionName = config.getString(CONNECTION_NAME_CONFIG);
+ String plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
+ topics = new HashMap<>();
+ // Create a buffer with a capacity of 1000 elements which schedules access in a fair way.
+ buffer = new ArrayBlockingQueue<>(1000, true);
+
+ ScraperConfigurationTriggeredImplBuilder builder = new ScraperConfigurationTriggeredImplBuilder();
+ builder.addSource(connectionName, plc4xConnectionString);
+
+ List<String> jobConfigs = config.getList(QUERIES_CONFIG);
+ for (String jobConfig : jobConfigs) {
+ String[] jobConfigSegments = jobConfig.split(":");
+ if(jobConfigSegments.length < 4) {
+ log.warn(String.format("Error in job configuration '%s'. " +
+ "The configuration expects at least 4 segments: " +
+ "{job-name}:{topic}:{rate}(:{field-alias}#{field-address})+", jobConfig));
+ continue;
+ }
- private void closeConnection() {
- if (plcConnection != null) {
- try {
- plcConnection.close();
- } catch (Exception e) {
- throw new PlcRuntimeException("Caught exception while closing connection to PLC", e);
+ String jobName = jobConfigSegments[0];
+ String topic = jobConfigSegments[1];
+ Integer rate = Integer.valueOf(jobConfigSegments[2]);
+ JobConfigurationTriggeredImplBuilder jobBuilder = builder.job(
+ jobName, String.format("(SCHEDULED,%s)", rate)).source(connectionName);
+ for(int i = 3; i < jobConfigSegments.length; i++) {
+ String[] fieldSegments = jobConfigSegments[i].split("=");
+ if(fieldSegments.length != 2) {
+ log.warn(String.format("Error in job configuration '%s'. " +
+ "The field segment expects a format {field-alias}#{field-address}, but got '%s'",
+ jobName, jobConfigSegments[i]));
+ continue;
+ }
+ String fieldAlias = fieldSegments[0];
+ String fieldAddress = fieldSegments[1];
+ jobBuilder.field(fieldAlias, fieldAddress);
+ topics.put(jobName, topic);
}
}
- }
- /**
- * Schedule next fetch operation.
- */
- private synchronized void scheduleFetch() {
- fetch = true;
- notify();
- }
+ ScraperConfigurationTriggeredImpl scraperConfig = builder.build();
- /**
- * Wait for next scheduled fetch operation or till the source is closed.
- * @param milliseconds maximum time to wait
- * @throws InterruptedException if the thread is interrupted
- * @return true if a fetch should be performed, false otherwise
- */
- private synchronized boolean awaitFetch(long milliseconds) throws InterruptedException {
- if (!fetch) {
- wait(milliseconds);
- }
try {
- return fetch;
- } finally {
- fetch = false;
+ plcDriverManager = new PooledPlcDriverManager();
+ triggerCollector = new TriggerCollectorImpl(plcDriverManager);
+ scraper = new TriggeredScraperImpl(scraperConfig, new ResultHandler() {
+ @Override
+ public void handle(String jobName, String sourceName, Map<String, Object> results) {
+ Long timestamp = System.currentTimeMillis();
+
+ Map<String, String> sourcePartition = new HashMap<>();
+ sourcePartition.put("sourceName", sourceName);
+ sourcePartition.put("jobName", jobName);
+
+ Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+ String topic = topics.get(jobName);
+
+ for (Map.Entry<String, Object> result : results.entrySet()) {
+ // Get field-name and -value from the results.
+ String fieldName = result.getKey();
+ Object fieldValue = result.getValue();
+
+ // Prepare the key structure.
+ Struct key = new Struct(KEY_SCHEMA)
+ .put(SOURCE_NAME_FIELD, sourceName)
+ .put(JOB_NAME_FIELD, jobName)
+ .put(FIELD_NAME_FIELD, fieldName);
+
+ // Get the schema for the given value type.
+ Schema valueSchema = getSchema(fieldValue);
+
+ // Prepare the source-record element.
+ SourceRecord record =
+ new SourceRecord(
+ sourcePartition,
+ sourceOffset,
+ topic,
+ KEY_SCHEMA,
+ key,
+ valueSchema,
+ fieldValue
+ );
+
+ // Add the new source-record to the buffer.
+ buffer.add(record);
+ }
+ }
+ }, triggerCollector);
+ scraper.start();
+ triggerCollector.start();
+ } catch (ScraperException e) {
+ log.error("Error starting the scraper", e);
}
}
- private List<SourceRecord> doFetch() throws InterruptedException {
- final CompletableFuture<? extends PlcReadResponse> response = createReadRequest().execute();
- try {
- final PlcReadResponse received = response.get(TIMEOUT_LIMIT_MILLIS, TimeUnit.MILLISECONDS);
- return extractValues(received);
- } catch (ExecutionException e) {
- throw new ConnectException("Could not fetch data from source", e);
- } catch (TimeoutException e) {
- throw new ConnectException("Timed out waiting for data from source", e);
- }
- }
-
- private PlcReadRequest createReadRequest() {
- PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
- for (String query : queries) {
- builder.addItem(query, query);
+ @Override
+ public void stop() {
+ synchronized (this) {
+ // TODO: Correctly shutdown the scaper.
+ notifyAll(); // wake up thread waiting in awaitFetch
}
- return builder.build();
}
- private List<SourceRecord> extractValues(PlcReadResponse response) {
- final List<SourceRecord> result = new LinkedList<>();
- for (String query : queries) {
- final PlcResponseCode rc = response.getResponseCode(query);
- if (!rc.equals(PlcResponseCode.OK)) {
- continue;
- }
-
- Struct key = new Struct(KEY_SCHEMA)
- .put(URL_FIELD, url)
- .put(QUERY_FIELD, query);
-
- Object value = response.getObject(query);
- Schema valueSchema = getSchema(value);
- Long timestamp = System.currentTimeMillis();
- Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put("url", url);
- sourcePartition.put("query", query);
- Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
- SourceRecord record =
- new SourceRecord(
- sourcePartition,
- sourceOffset,
- topic,
- KEY_SCHEMA,
- key,
- valueSchema,
- value
- );
-
- result.add(record);
+ @Override
+ public List<SourceRecord> poll() {
+ if(!buffer.isEmpty()) {
+ int numElements = buffer.size();
+ List<SourceRecord> result = new ArrayList<>(numElements);
+ buffer.drainTo(result, numElements);
+ return result;
+ } else {
+ return Collections.emptyList();
}
-
- return result;
}
private Schema getSchema(Object value) {
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java
index 08d133e..91093ac 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java
@@ -139,8 +139,22 @@ public class SourceConfig {
return sources;
}
+ public Source getSource(String sourceName) {
+ if(sources == null) {
+ return null;
+ }
+ return sources.stream().filter(source -> source.getName().equals(sourceName)).findFirst().orElse(null);
+ }
+
public List<Job> getJobs() {
return jobs;
}
+ public Job getJob(String jobName) {
+ if(jobs == null) {
+ return null;
+ }
+ return jobs.stream().filter(job -> job.getName().equals(jobName)).findFirst().orElse(null);
+ }
+
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java
index 48b7bd6..502e995 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java
@@ -61,7 +61,7 @@ public class JobConfigurationTriggeredImplBuilder {
}
private JobConfigurationTriggeredImpl buildInternal() {
- return new JobConfigurationTriggeredImpl(name, triggerConfig,null, sources, fields);
+ return new JobConfigurationTriggeredImpl(name, triggerConfig, null, sources, fields);
}
public ScraperConfigurationTriggeredImplBuilder build() {
diff --git a/plc4py/src/main/python/__init__.py b/plc4py/src/main/python/__init__.py
index 6fa4b09..c3434ab 100644
--- a/plc4py/src/main/python/__init__.py
+++ b/plc4py/src/main/python/__init__.py
@@ -14,13 +14,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.