You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/14 17:08:19 UTC

[plc4x] 04/04: - Continued working on the scraper based Kafka Connect plugin

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 7c0303fc2dc37c6f8c0bffc4d74ec76d42d1cf49
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Wed Aug 14 19:08:01 2019 +0200

    - Continued working on the scraper based Kafka Connect plugin
---
 plc4cpp/pom.xml                                    |   7 +-
 plc4j/integrations/apache-kafka/README.md          |   8 +-
 .../apache-kafka/config/sink.properties            |  23 --
 plc4j/integrations/apache-kafka/pom.xml            |  77 ++++++
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   |  91 +++----
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    | 288 ++++++++++-----------
 .../apache/plc4x/kafka/config/SourceConfig.java    |  14 +
 .../JobConfigurationTriggeredImplBuilder.java      |   2 +-
 plc4py/src/main/python/__init__.py                 |   9 -
 9 files changed, 278 insertions(+), 241 deletions(-)

diff --git a/plc4cpp/pom.xml b/plc4cpp/pom.xml
index a1f0eb5..3b1796d 100644
--- a/plc4cpp/pom.xml
+++ b/plc4cpp/pom.xml
@@ -33,12 +33,9 @@
   <name>PLC4Cpp</name>
   <description>Implementation of the protocol adapters for usage as C++ library.</description>
 
-  <!-- Disabled for now as C++ support is currently not installed in Apache SonarQube -->
-  <!--properties>
-    <sonar.language>c++</sonar.language>
-  </properties-->
-
   <properties>
+    <!-- Tell Sonar where to find the c++ sources -->
+    <sonar.sources>api/src/main/cpp,drivers/proxy/src/main/cpp,drivers/s7/src/main/cpp,protocols/driver-bases/base/src/main/cpp,protocols/s7/src/main/cpp,utils/logger/src/main/cpp,utils/systemconfig/src/main/cpp</sonar.sources>
     <option.with-proxies>OFF</option.with-proxies>
   </properties>
 
diff --git a/plc4j/integrations/apache-kafka/README.md b/plc4j/integrations/apache-kafka/README.md
index a2a073e..2c29fcb 100644
--- a/plc4j/integrations/apache-kafka/README.md
+++ b/plc4j/integrations/apache-kafka/README.md
@@ -19,16 +19,12 @@
 
 # Kafka Connect PLC4X Connector
 
-The PLC4X Connector streams data from and to any device accessible through the PLC4X interface.
+The PLC4X Connector streams data from any device accessible through the PLC4X interface.
 
 ## Source Connector
 
 See `config/source.properties` for example configuration.
 
-## Sink Connector
-
-See `config/sink.properties` for example configuration.
-
 ## Quickstart
 
 A Kafka Connect worker can be run in two modes: 
@@ -42,7 +38,7 @@ In order to start a Kafka Connect system the following steps have to be performe
 
 1) Download the latest version of Apache Kafka binaries from here: https://kafka.apache.org/downloads
 2) Unpack the archive.
-3) Copy the target/apache-kafka-0.5.0-SNAPSHOT.jar to the Kafka "libs" directory.
+3) Copy the target/apache-kafka-0.5.0-SNAPSHOT-jar-with-dependencies.jar to the Kafka "libs" directory.
 4) Copy the files in the "config" to Kafka's "configs" directory (maybe inside a "plc4x" subdirectory)
 
 ### Start a Kafka Broker
diff --git a/plc4j/integrations/apache-kafka/config/sink.properties b/plc4j/integrations/apache-kafka/config/sink.properties
deleted file mode 100644
index 8da6eae..0000000
--- a/plc4j/integrations/apache-kafka/config/sink.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-name=plc-sink-test
-connector.class=org.apache.plc4x.kafka.Plc4xSinkConnector
-topics=test
-url=test:unused
-transforms=key
-transforms.key.type=org.apache.kafka.connect.transforms.ExtractField$Key
-transforms.key.field=query
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/pom.xml b/plc4j/integrations/apache-kafka/pom.xml
index 857f412..6dfc05e 100644
--- a/plc4j/integrations/apache-kafka/pom.xml
+++ b/plc4j/integrations/apache-kafka/pom.xml
@@ -35,6 +35,45 @@
     <kafka.version>2.3.0</kafka.version>
   </properties>
 
+  <!-- Build a fat jar so the user only needs to drop in this one jar -->
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
+        <executions>
+          <execution>
+            <id>assemble-all</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <configuration>
+          <usedDependencies combine.children="append">
+            <usedDependency>org.apache.plc4x:plc4j-scraper</usedDependency>
+            <usedDependency>org.apache.plc4x:plc4j-driver-ads</usedDependency>
+            <usedDependency>org.apache.plc4x:plc4j-driver-ethernet-ip</usedDependency>
+            <usedDependency>org.apache.plc4x:plc4j-driver-modbus</usedDependency>
+            <usedDependency>org.apache.plc4x:plc4j-driver-opcua</usedDependency>
+            <usedDependency>org.apache.plc4x:plc4j-driver-s7</usedDependency>
+            <usedDependency>org.apache.plc4x:plc4j-driver-simulated</usedDependency>
+          </usedDependencies>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.plc4x</groupId>
@@ -64,6 +103,44 @@
       <version>${kafka.version}</version>
       <scope>provided</scope>
     </dependency>
+
+    <!-- Include all drivers -->
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-driver-ads</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-driver-ethernet-ip</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-driver-modbus</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-driver-opcua</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-driver-s7</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-driver-simulated</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+      <scope>runtime</scope>
+    </dependency>
   </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index ffb6b35..858bd3d 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -21,6 +21,9 @@ package org.apache.plc4x.kafka;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.plc4x.kafka.config.Job;
+import org.apache.plc4x.kafka.config.JobReference;
+import org.apache.plc4x.kafka.config.Source;
 import org.apache.plc4x.kafka.config.SourceConfig;
 import org.apache.plc4x.kafka.exceptions.ConfigurationException;
 import org.apache.plc4x.kafka.util.VersionUtil;
@@ -42,11 +45,14 @@ public class Plc4xSourceConnector extends SourceConnector {
         try {
             sourceConfig = SourceConfig.fromPropertyMap(props);
         } catch (ConfigurationException e) {
+            log.error("Error processing source configuration", e);
         }
     }
 
     @Override
-    public void stop() {}
+    public void stop() {
+        sourceConfig = null;
+    }
 
     @Override
     public Class<? extends Task> taskClass() {
@@ -54,59 +60,46 @@ public class Plc4xSourceConnector extends SourceConnector {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public List<Map<String, String>> taskConfigs(int maxTasks) {
+        // Initially we planned to have the simple assumption that one task maps to one PLC connection.
+        // But we could easily say that one scraper instance maps to a task and one scraper task can
+        // process multiple PLC connections. But I guess this would be an optimization as we have to
+        // balance the load manually.
+        if(sourceConfig.getJobs().size() > maxTasks) {
+            // not enough tasks
+            log.warn("NOT ENOUGH TASKS!");
+            return Collections.emptyList();
+        }
+
+        // For each configured source we'll start a dedicated scraper instance collecting
+        // all the scraper jobs enabled for this source.
         List<Map<String, String>> configs = new LinkedList<>();
-        /*if (json.isEmpty()) {
-            Map<String, List<String>> groupedByHost = new HashMap<>();
-            queries.stream().map(query -> query.split("#", 2)).collect(Collectors.groupingBy(parts -> parts[0])).forEach((host, q) ->
-                groupedByHost.put(host, q.stream().map(parts -> parts[1]).collect(Collectors.toList())));
-            if (groupedByHost.size() > maxTasks) {
-                // Not enough tasks
-                // TODO: throw exception?
-                return Collections.emptyList();
-            }
-            groupedByHost.forEach((host, qs) -> {
-                Map<String, String> taskConfig = new HashMap<>();
-                taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic);
-                taskConfig.put(Plc4xSourceTask.URL_CONFIG, host);
-                taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", qs));
-                taskConfig.put(Plc4xSourceTask.RATE_CONFIG, rate.toString());
-                configs.add(taskConfig);
-            });
-        } else {
-            try {
-                // TODO
-                String config =  new Scanner(new URL(json).openStream(), StandardCharsets.UTF_8.name()).useDelimiter("\\A").next();
-                ObjectMapper mapper = new ObjectMapper();
-                Map<String, Object> values = mapper.readValue(config, new TypeReference<Map<String, Object>>() {});
-                List<Map<String, Object>> plcs = (List<Map<String, Object>>) values.get("PLCs");
-                log.info("TASKS REQUIRED: " + plcs.size());
-                if (plcs.size() > maxTasks) {
-                    // not enough tasks
-                    log.warn("NOT ENOUGH TASKS!");
-                    return Collections.emptyList();
-                }
-                for (Map<String, Object> plc : plcs) {
-                    Map<String, String> taskConfig = new HashMap<>();
-                    String ip = plc.get("IP").toString();
-                    String topic = ip;
-                    String url = "s7://" + ip + "/1/" + plc.get("Slot");
-                    List<String> queries = new LinkedList<>();
-                    for (Map<String, Object> operand : (List<Map<String, Object>>)plc.get("operands")) {
-                        String query = "%" + operand.get("Operand") + ":" + operand.get("Datatype");
-                        queries.add(query);
+        for (Source source : sourceConfig.getSources()) {
+            // Build a list of job configurations only containing the ones referenced from
+            // the current source.
+            StringBuilder query = new StringBuilder();
+            for (JobReference jobReference : source.getJobReferences()) {
+                Job job = sourceConfig.getJob(jobReference.getName());
+                if(job == null) {
+                    log.warn(String.format("Couldn't find referenced job '%s'", jobReference.getName()));
+                } else if(jobReference.isEnabled()) {
+                    query.append(",").append(jobReference.getName()).append(":").append(jobReference.getTopic());
+                    query.append(":").append(job.getInterval());
+                    for (Map.Entry<String, String> field : job.getFields().entrySet()) {
+                        String fieldName = field.getKey();
+                        String fieldAddress = field.getValue();
+                        query.append(":").append(fieldName).append("#").append(fieldAddress);
                     }
-                    taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic);
-                    taskConfig.put(Plc4xSourceTask.URL_CONFIG, url);
-                    taskConfig.put(RATE_CONFIG, rate.toString());
-                    taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", queries));
-                    configs.add(taskConfig);
                 }
-            } catch (IOException e) {
-                log.error("ERROR CONFIGURING TASK", e);
             }
-        }*/
+
+            // Create a new task configuration.
+            Map<String, String> taskConfig = new HashMap<>();
+            taskConfig.put(Plc4xSourceTask.CONNECTION_NAME_CONFIG, source.getName());
+            taskConfig.put(Plc4xSourceTask.PLC4X_CONNECTION_STRING_CONFIG, source.getConnectionString());
+            taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, query.toString().substring(1));
+            configs.add(taskConfig);
+        }
         return configs;
     }
 
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index dfc1c04..dc169fe 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -27,13 +27,18 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.scraper.ResultHandler;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 import org.apache.plc4x.kafka.util.VersionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.*;
@@ -46,46 +51,48 @@ import java.util.concurrent.*;
  */
 public class Plc4xSourceTask extends SourceTask {
 
-    static final String TOPIC_CONFIG = "topic";
-    private static final String TOPIC_DOC = "Kafka topic to publish to";
+    private static final Logger log = LoggerFactory.getLogger(Plc4xSourceTask.class);
 
-    static final String URL_CONFIG = "url";
-    private static final String URL_DOC = "PLC URL";
+    /*
+     * Config of the task.
+     */
+    static final String CONNECTION_NAME_CONFIG = "connection-name";
+    private static final String CONNECTION_NAME_STRING_DOC = "Connection Name";
+
+    static final String PLC4X_CONNECTION_STRING_CONFIG = "plc4x-connection-string";
+    private static final String PLC4X_CONNECTION_STRING_DOC = "PLC4X Connection String";
 
+    // Syntax for the queries: {job-name}:{topic}:{rate}:{field-alias}#{field-address}:{field-alias}#{field-address}...,{topic}:{rate}:....
     static final String QUERIES_CONFIG = "queries";
     private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
 
-    static final String RATE_CONFIG = "rate";
-    private static final Integer RATE_DEFAULT = 1000;
-    private static final String RATE_DOC = "Polling rate";
-
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
-        .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
-        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
-        .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
-        .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
+        .define(CONNECTION_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, CONNECTION_NAME_STRING_DOC)
+        .define(PLC4X_CONNECTION_STRING_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, PLC4X_CONNECTION_STRING_DOC)
+        .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC);
 
-    private static final long WAIT_LIMIT_MILLIS = 100;
-    private static final long TIMEOUT_LIMIT_MILLIS = 5000;
-
-    private static final String URL_FIELD = "url";
-    private static final String QUERY_FIELD = "query";
+    /*
+     * Configuration of the output.
+     */
+    private static final String SOURCE_NAME_FIELD = "source-name";
+    private static final String JOB_NAME_FIELD = "job-name";
+    private static final String FIELD_NAME_FIELD = "field-name";
 
     private static final Schema KEY_SCHEMA =
         new SchemaBuilder(Schema.Type.STRUCT)
-            .field(URL_FIELD, Schema.STRING_SCHEMA)
-            .field(QUERY_FIELD, Schema.STRING_SCHEMA)
+            .field(SOURCE_NAME_FIELD, Schema.STRING_SCHEMA)
+            .field(JOB_NAME_FIELD, Schema.STRING_SCHEMA)
+            .field(FIELD_NAME_FIELD, Schema.STRING_SCHEMA)
             .build();
 
-    private String topic;
-    private String url;
-    private List<String> queries;
-
-    private PlcConnection plcConnection;
+    // Internal properties.
+    private Map<String, String> topics;
+    private PlcDriverManager plcDriverManager;
+    private TriggerCollector triggerCollector;
+    private TriggeredScraperImpl scraper;
 
-    // TODO: should we use shared (static) thread pool for this?
-    private ScheduledExecutorService scheduler;
-    private boolean fetch = true;
+    // Internal buffer into which all incoming scraper responses are written to.
+    private ArrayBlockingQueue<SourceRecord> buffer;
 
     @Override
     public String version() {
@@ -95,134 +102,119 @@ public class Plc4xSourceTask extends SourceTask {
     @Override
     public void start(Map<String, String> props) {
         AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
-        topic = config.getString(TOPIC_CONFIG);
-        url = config.getString(URL_CONFIG);
-        queries = config.getList(QUERIES_CONFIG);
-
-        openConnection();
-
-        if (!plcConnection.getMetadata().canRead()) {
-            throw new ConnectException("Reading not supported on this connection");
-        }
-
-        int rate = Integer.parseInt(props.get(RATE_CONFIG));
-        scheduler = Executors.newScheduledThreadPool(1);
-        scheduler.scheduleAtFixedRate(Plc4xSourceTask.this::scheduleFetch, rate, rate, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void stop() {
-        scheduler.shutdown();
-        closeConnection();
-        synchronized (this) {
-            notify(); // wake up thread waiting in awaitFetch
-        }
-    }
-
-    @Override
-    public List<SourceRecord> poll() throws InterruptedException {
-        return awaitFetch(WAIT_LIMIT_MILLIS) ? doFetch() : null;
-    }
-
-    private void openConnection() {
-        try {
-            plcConnection = new PlcDriverManager().getConnection(url);
-            plcConnection.connect();
-        } catch (PlcConnectionException e) {
-            throw new ConnectException("Could not establish a PLC connection", e);
-        }
-    }
+        String connectionName = config.getString(CONNECTION_NAME_CONFIG);
+        String plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
+        topics = new HashMap<>();
+        // Create a buffer with a capacity of 1000 elements which schedules access in a fair way.
+        buffer = new ArrayBlockingQueue<>(1000, true);
+
+        ScraperConfigurationTriggeredImplBuilder builder = new ScraperConfigurationTriggeredImplBuilder();
+        builder.addSource(connectionName, plc4xConnectionString);
+
+        List<String> jobConfigs = config.getList(QUERIES_CONFIG);
+        for (String jobConfig : jobConfigs) {
+            String[] jobConfigSegments = jobConfig.split(":");
+            if(jobConfigSegments.length < 4) {
+                log.warn(String.format("Error in job configuration '%s'. " +
+                    "The configuration expects at least 4 segments: " +
+                    "{job-name}:{topic}:{rate}(:{field-alias}#{field-address})+", jobConfig));
+                continue;
+            }
 
-    private void closeConnection() {
-        if (plcConnection != null) {
-            try {
-                plcConnection.close();
-            } catch (Exception e) {
-                throw new PlcRuntimeException("Caught exception while closing connection to PLC", e);
+            String jobName = jobConfigSegments[0];
+            String topic = jobConfigSegments[1];
+            Integer rate = Integer.valueOf(jobConfigSegments[2]);
+            JobConfigurationTriggeredImplBuilder jobBuilder = builder.job(
+                jobName, String.format("(SCHEDULED,%s)", rate)).source(connectionName);
+            for(int i = 3; i < jobConfigSegments.length; i++) {
+                String[] fieldSegments = jobConfigSegments[i].split("=");
+                if(fieldSegments.length != 2) {
+                    log.warn(String.format("Error in job configuration '%s'. " +
+                            "The field segment expects a format {field-alias}#{field-address}, but got '%s'",
+                        jobName, jobConfigSegments[i]));
+                    continue;
+                }
+                String fieldAlias = fieldSegments[0];
+                String fieldAddress = fieldSegments[1];
+                jobBuilder.field(fieldAlias, fieldAddress);
+                topics.put(jobName, topic);
             }
         }
-    }
 
-    /**
-     * Schedule next fetch operation.
-     */
-    private synchronized void scheduleFetch() {
-        fetch = true;
-        notify();
-    }
+        ScraperConfigurationTriggeredImpl scraperConfig = builder.build();
 
-    /**
-     * Wait for next scheduled fetch operation or till the source is closed.
-     * @param milliseconds maximum time to wait
-     * @throws InterruptedException if the thread is interrupted
-     * @return true if a fetch should be performed, false otherwise
-     */
-    private synchronized boolean awaitFetch(long milliseconds) throws InterruptedException {
-        if (!fetch) {
-            wait(milliseconds);
-        }
         try {
-            return fetch;
-        } finally {
-            fetch = false;
+            plcDriverManager = new PooledPlcDriverManager();
+            triggerCollector = new TriggerCollectorImpl(plcDriverManager);
+            scraper = new TriggeredScraperImpl(scraperConfig, new ResultHandler() {
+                @Override
+                public void handle(String jobName, String sourceName, Map<String, Object> results) {
+                    Long timestamp = System.currentTimeMillis();
+
+                    Map<String, String> sourcePartition = new HashMap<>();
+                    sourcePartition.put("sourceName", sourceName);
+                    sourcePartition.put("jobName", jobName);
+
+                    Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+                    String topic = topics.get(jobName);
+
+                    for (Map.Entry<String, Object> result : results.entrySet()) {
+                        // Get field-name and -value from the results.
+                        String fieldName = result.getKey();
+                        Object fieldValue = result.getValue();
+
+                        // Prepare the key structure.
+                        Struct key = new Struct(KEY_SCHEMA)
+                            .put(SOURCE_NAME_FIELD, sourceName)
+                            .put(JOB_NAME_FIELD, jobName)
+                            .put(FIELD_NAME_FIELD, fieldName);
+
+                        // Get the schema for the given value type.
+                        Schema valueSchema = getSchema(fieldValue);
+
+                        // Prepare the source-record element.
+                        SourceRecord record =
+                            new SourceRecord(
+                                sourcePartition,
+                                sourceOffset,
+                                topic,
+                                KEY_SCHEMA,
+                                key,
+                                valueSchema,
+                                fieldValue
+                            );
+
+                        // Add the new source-record to the buffer.
+                        buffer.add(record);
+                    }
+                }
+            }, triggerCollector);
+            scraper.start();
+            triggerCollector.start();
+        } catch (ScraperException e) {
+            log.error("Error starting the scraper", e);
         }
     }
 
-    private List<SourceRecord> doFetch() throws InterruptedException {
-        final CompletableFuture<? extends PlcReadResponse> response = createReadRequest().execute();
-        try {
-            final PlcReadResponse received = response.get(TIMEOUT_LIMIT_MILLIS, TimeUnit.MILLISECONDS);
-            return extractValues(received);
-        } catch (ExecutionException e) {
-            throw new ConnectException("Could not fetch data from source", e);
-        } catch (TimeoutException e) {
-            throw new ConnectException("Timed out waiting for data from source", e);
-        }
-    }
-
-    private PlcReadRequest createReadRequest() {
-        PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
-        for (String query : queries) {
-            builder.addItem(query, query);
+    @Override
+    public void stop() {
+        synchronized (this) {
+            // TODO: Correctly shutdown the scaper.
+            notifyAll(); // wake up thread waiting in awaitFetch
         }
-        return builder.build();
     }
 
-    private List<SourceRecord> extractValues(PlcReadResponse response) {
-        final List<SourceRecord> result = new LinkedList<>();
-        for (String query : queries) {
-            final PlcResponseCode rc = response.getResponseCode(query);
-            if (!rc.equals(PlcResponseCode.OK))  {
-                continue;
-            }
-
-            Struct key = new Struct(KEY_SCHEMA)
-                .put(URL_FIELD, url)
-                .put(QUERY_FIELD, query);
-
-            Object value = response.getObject(query);
-            Schema valueSchema = getSchema(value);
-            Long timestamp = System.currentTimeMillis();
-            Map<String, String> sourcePartition = new HashMap<>();
-            sourcePartition.put("url", url);
-            sourcePartition.put("query", query);
-            Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
-            SourceRecord record =
-                new SourceRecord(
-                    sourcePartition,
-                    sourceOffset,
-                    topic,
-                    KEY_SCHEMA,
-                    key,
-                    valueSchema,
-                    value
-                );
-
-            result.add(record);
+    @Override
+    public List<SourceRecord> poll() {
+        if(!buffer.isEmpty()) {
+            int numElements = buffer.size();
+            List<SourceRecord> result = new ArrayList<>(numElements);
+            buffer.drainTo(result, numElements);
+            return result;
+        } else {
+            return Collections.emptyList();
         }
-
-        return result;
     }
 
     private Schema getSchema(Object value) {
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java
index 08d133e..91093ac 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java
@@ -139,8 +139,22 @@ public class SourceConfig {
         return sources;
     }
 
+    public Source getSource(String sourceName) {
+        if(sources == null) {
+            return null;
+        }
+        return sources.stream().filter(source -> source.getName().equals(sourceName)).findFirst().orElse(null);
+    }
+
     public List<Job> getJobs() {
         return jobs;
     }
 
+    public Job getJob(String jobName) {
+        if(jobs == null) {
+            return null;
+        }
+        return jobs.stream().filter(job -> job.getName().equals(jobName)).findFirst().orElse(null);
+    }
+
 }
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java
index 48b7bd6..502e995 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java
@@ -61,7 +61,7 @@ public class JobConfigurationTriggeredImplBuilder {
     }
 
     private JobConfigurationTriggeredImpl buildInternal() {
-        return new JobConfigurationTriggeredImpl(name, triggerConfig,null, sources, fields);
+        return new JobConfigurationTriggeredImpl(name, triggerConfig, null, sources, fields);
     }
 
     public ScraperConfigurationTriggeredImplBuilder build() {
diff --git a/plc4py/src/main/python/__init__.py b/plc4py/src/main/python/__init__.py
index 6fa4b09..c3434ab 100644
--- a/plc4py/src/main/python/__init__.py
+++ b/plc4py/src/main/python/__init__.py
@@ -14,13 +14,4 @@
 #  KIND, either express or implied.  See the License for the
 #  specific language governing permissions and limitations
 #  under the License.
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing,
-#  software distributed under the License is distributed on an
-#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-#  KIND, either express or implied.  See the License for the
-#  specific language governing permissions and limitations
-#  under the License.