You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2018/11/24 20:35:56 UTC

[incubator-plc4x] 06/09: [plc4j-scraper] Current state.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 07886c2f6aa314e71235a948a3d3b03666067114
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 17:40:58 2018 +0100

    [plc4j-scraper] Current state.
---
 .../plc4x/java/s7/connection/S7PlcConnection.java  |   2 +-
 plc4j/utils/scraper/pom.xml                        |  15 +-
 .../org/apache/plc4x/java/scraper/Scraper.java     | 161 +++++++++++++------
 .../org/apache/plc4x/java/scraper/ScraperTask.java | 176 +++++++++++++++++++++
 .../apache/plc4x/java/scraper/ScraperTaskTest.java | 111 +++++++++++++
 .../org/apache/plc4x/java/scraper/ScraperTest.java |  78 +++++----
 .../resources/{logback.xml => logback-test.xml}    |   2 +-
 7 files changed, 466 insertions(+), 79 deletions(-)

diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
index a56228e..63609d5 100644
--- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
+++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
@@ -242,7 +242,7 @@ public class S7PlcConnection extends NettyPlcConnection implements PlcReader, Pl
             // If the remote didn't close the connection within the given time-frame, we have to take
             // care of closing the connection.
             catch (TimeoutException e) {
-                logger.info("Remote didn't close connection within the configured timeout of {}ms, shutting down actively.", CLOSE_DEVICE_TIMEOUT_MS, e);
+                logger.debug("Remote didn't close connection within the configured timeout of {} ms, shutting down actively.", CLOSE_DEVICE_TIMEOUT_MS, e);
                 channel.close();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
diff --git a/plc4j/utils/scraper/pom.xml b/plc4j/utils/scraper/pom.xml
index b88e8a2..9956414 100644
--- a/plc4j/utils/scraper/pom.xml
+++ b/plc4j/utils/scraper/pom.xml
@@ -49,7 +49,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-connectionString-pool</artifactId>
+      <artifactId>plc4j-connection-pool</artifactId>
       <version>0.3.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
@@ -59,6 +59,19 @@
       <version>0.3.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <!--TODO Remove this-->
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+    </dependency>
   </dependencies>
 
 
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
index dfd4801..90da56a 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
@@ -19,72 +19,135 @@
 
 package org.apache.plc4x.java.scraper;
 
+import org.apache.commons.collections4.MultiValuedMap;
+import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
+import org.apache.commons.lang3.Validate;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.commons.math3.exception.MathIllegalArgumentException;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math3.stat.descriptive.UnivariateStatistic;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
- * Plc Scraper that scrapes one source.
+ * Main class that orchestrates scraping.
  */
-public class Scraper implements Runnable {
+public class Scraper {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Scraper.class);
 
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10,
+        new BasicThreadFactory.Builder()
+            .namingPattern("scheduler-thread-%d")
+            .daemon(true)
+            .build()
+    );
+    private final ExecutorService handlerPool = Executors.newFixedThreadPool(4,
+        new BasicThreadFactory.Builder()
+            .namingPattern("handler-thread-%d")
+            .daemon(true)
+            .build()
+    );
+    private final MultiValuedMap<ScrapeJob, ScraperTask> tasks = new ArrayListValuedHashMap<>();
     private final PlcDriverManager driverManager;
-    private final String connectionString;
-    private final long requestTimeoutMs;
-    private final ResultHandler handler;
+    private final List<ScrapeJob> jobs;
 
-    public Scraper(PlcDriverManager driverManager, String connectionString, long requestTimeoutMs, ResultHandler handler) {
+    public Scraper(PlcDriverManager driverManager, List<ScrapeJob> jobs) {
+        Validate.notEmpty(jobs);
         this.driverManager = driverManager;
-        this.connectionString = connectionString;
-        this.requestTimeoutMs = requestTimeoutMs;
-        this.handler = handler;
-    }
+        this.jobs = jobs;
+
+        // Schedule all jobs
+        LOGGER.info("Registering jobs...");
+        jobs.stream()
+            .flatMap(job -> job.connections.entrySet().stream()
+                .map(entry -> Triple.of(job, entry.getKey(), entry.getValue()))
+            )
+            .forEach(
+                tuple -> {
+                    LOGGER.debug("Register task for job {} for conn {} ({}) at rate {} ms",
+                        tuple.getLeft().name, tuple.getMiddle(), tuple.getRight(), tuple.getLeft().scrapeRate);
+                    ScraperTask task = new ScraperTask(driverManager,
+                        tuple.getLeft().name, tuple.getMiddle(), tuple.getRight(),
+                        tuple.getLeft().fields,
+                        1_000,
+                        handlerPool);
+                    // Add task to internal list
+                    tasks.put(tuple.getLeft(), task);
+                    scheduler.scheduleAtFixedRate(task,
+                        0, tuple.getLeft().scrapeRate, TimeUnit.MILLISECONDS);
+                }
+            );
 
-    @Override
-    public void run() {
-        // Does a single fetch
-        try (PlcConnection connection = driverManager.getConnection(connectionString)) {
-            PlcReadResponse response;
-            try {
-                response = connection.readRequestBuilder()
-                    .addItem("item1", "add1")
-                    .build()
-                    .execute()
-                    .get(requestTimeoutMs, TimeUnit.MILLISECONDS);
-            } catch (ExecutionException e) {
-                // Handle execution exception
-                handler.handleException(e);
-                return;
+        // Add statistics tracker
+        scheduler.scheduleAtFixedRate(() -> {
+            for (Map.Entry<ScrapeJob, ScraperTask> entry : tasks.entries()) {
+                DescriptiveStatistics statistics = entry.getValue().getLatencyStatistics();
+                String msg = String.format(Locale.ENGLISH, "Job statistics (%s, %s) number of requests: %d (%d success, %.1f %% failed, %.1f %% too slow), mean latency: %.2f ms, median: %.2f ms",
+                    entry.getValue().getJobName(), entry.getValue().getConnectionAlias(), entry.getValue().getRequestCounter(), entry.getValue().getSuccessfullRequestCounter(), entry.getValue().getPercentageFailed(), statistics.apply(new PercentageAboveThreshold(entry.getKey().scrapeRate*1e6)), statistics.getMean()*1e-6, statistics.getPercentile(50)*1e-6);
+                LOGGER.info(msg);
             }
-            CompletableFuture.runAsync(() -> handler.handle(transformResponseToMap(response)));
-        } catch (PlcConnectionException e) {
-            throw new PlcRuntimeException("Unable to fetch", e);
-        } catch (Exception e) {
-            throw new PlcRuntimeException("Unexpected exception during fetch", e);
-        }
+        }, 1_000, 1_000, TimeUnit.MILLISECONDS);
     }
 
-    private Map<String, Object> transformResponseToMap(PlcReadResponse response) {
-        return response.getFieldNames().stream()
-            .collect(Collectors.toMap(
-                name -> name,
-                response::getObject
-            ));
+    public static class ScrapeJob {
+
+        private final String name;
+        private final long scrapeRate;
+        /**
+         * alias -> connection-string
+         */
+        private final Map<String, String> connections;
+        /**
+         * alias -> field-query
+         */
+        private final Map<String, String> fields;
+
+        public ScrapeJob(String name, long scrapeRate, Map<String, String> connections, Map<String, String> fields) {
+            this.name = name;
+            this.scrapeRate = scrapeRate;
+            this.connections = connections;
+            this.fields = fields;
+        }
     }
 
-    public interface ResultHandler {
+    private static class PercentageAboveThreshold implements UnivariateStatistic {
+
+        private final double threshold;
 
-        void handle(Map<String, Object> result);
+        public PercentageAboveThreshold(double threshold) {
+            this.threshold = threshold;
+        }
 
-        void handleException(Exception e);
+        @Override
+        public double evaluate(double[] values) throws MathIllegalArgumentException {
+            long below = Arrays.stream(values)
+                .filter(val -> val <= threshold)
+                .count();
+            return (double)below/values.length;
+        }
 
+        @Override
+        public double evaluate(double[] values, int begin, int length) throws MathIllegalArgumentException {
+            long below = IntStream.range(begin, length)
+                .mapToDouble(i -> values[i])
+                .filter(val -> val > threshold)
+                .count();
+            return 100.0*below/length;
+        }
+
+        @Override
+        public UnivariateStatistic copy() {
+            return new PercentageAboveThreshold(threshold);
+        }
     }
 }
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java
new file mode 100644
index 0000000..9bbf5e4
--- /dev/null
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Plc Scraper that scrapes one source.
+ */
+public class ScraperTask implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ScraperTask.class);
+
+    private final PlcDriverManager driverManager;
+    private final String jobName;
+    private final String connectionAlias;
+    private final String connectionString;
+    private final Map<String, String> fields;
+    private final long requestTimeoutMs;
+    private final ExecutorService handlerService;
+
+    private final AtomicLong requestCounter = new AtomicLong(0);
+    private final AtomicLong successCounter = new AtomicLong(0);
+    private final DescriptiveStatistics latencyStatistics = new DescriptiveStatistics(1000);
+    private final DescriptiveStatistics failedStatistics = new DescriptiveStatistics(1000);
+
+    public ScraperTask(PlcDriverManager driverManager, String jobName, String connectionAlias, String connectionString,
+                       Map<String, String> fields, long requestTimeoutMs, ExecutorService handlerService) {
+        Validate.notNull(driverManager);
+        Validate.notBlank(jobName);
+        Validate.notBlank(connectionAlias);
+        Validate.notBlank(connectionString);
+        Validate.notEmpty(fields);
+        Validate.isTrue(requestTimeoutMs > 0);
+        this.driverManager = driverManager;
+        this.jobName = jobName;
+        this.connectionAlias = connectionAlias;
+        this.connectionString = connectionString;
+        this.fields = fields;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.handlerService = handlerService;
+    }
+
+    @Override
+    public void run() {
+        // Does a single fetch
+        LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+        requestCounter.incrementAndGet();
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        try (PlcConnection connection = driverManager.getConnection(connectionString)) {
+            LOGGER.trace("Connection to {} established: {}", connectionString, connection);
+            PlcReadResponse response;
+            try {
+                PlcReadRequest.Builder builder = connection.readRequestBuilder();
+                fields.forEach((alias,qry) -> {
+                    LOGGER.trace("Requesting: {} -> {}", alias, qry);
+                    builder.addItem(alias,qry);
+                });
+                response = builder
+                    .build()
+                    .execute()
+                    .get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+            } catch (ExecutionException e) {
+                // Handle execution exception
+                handleException(e);
+                return;
+            }
+            // Add statistics
+            stopWatch.stop();
+            latencyStatistics.addValue(stopWatch.getNanoTime());
+            failedStatistics.addValue(0.0);
+            successCounter.incrementAndGet();
+            // Validate response
+            validateResponse(response);
+            // Handle response (Async)
+            CompletableFuture.runAsync(() -> handle(transformResponseToMap(response)), handlerService);
+        } catch (Exception e) {
+            failedStatistics.addValue(1.0);
+            LOGGER.debug("Exception during scrape", e);
+            handleException(e);
+        }
+    }
+
+    private void validateResponse(PlcReadResponse response) {
+        Map<String, PlcResponseCode> failedFields = response.getFieldNames().stream()
+            .filter(name -> !PlcResponseCode.OK.equals(response.getResponseCode(name)))
+            .collect(Collectors.toMap(
+                Function.identity(),
+                response::getResponseCode
+            ));
+        if (failedFields.size() > 0) {
+            handleErrorResponse(failedFields);
+        }
+    }
+
+    private Map<String, Object> transformResponseToMap(PlcReadResponse response) {
+        return response.getFieldNames().stream()
+            .collect(Collectors.toMap(
+                name -> name,
+                response::getObject
+            ));
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public String getConnectionAlias() {
+        return connectionAlias;
+    }
+
+    public long getRequestCounter() {
+        return requestCounter.get();
+    }
+
+    public long getSuccessfullRequestCounter() {
+        return successCounter.get();
+    }
+
+    public DescriptiveStatistics getLatencyStatistics() {
+        return latencyStatistics;
+    }
+
+    public double getPercentageFailed() {
+        return 100.0*failedStatistics.getMean();
+    }
+
+    public void handle(Map<String, Object> result) {
+        LOGGER.debug("Handling result on gorgeous pool: {}", result);
+    }
+
+    public void handleException(Exception e) {
+        failedStatistics.addValue(1.0);
+    }
+
+    public void handleErrorResponse(Map<String, PlcResponseCode> failed) {
+        LOGGER.warn("Handling error responses: {}", failed);
+    }
+
+}
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
new file mode 100644
index 0000000..546f3cb
--- /dev/null
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.items.DefaultStringFieldItem;
+import org.apache.plc4x.java.mock.MockDevice;
+import org.apache.plc4x.java.mock.PlcMockConnection;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+public class ScraperTaskTest implements WithAssertions {
+
+    @Mock
+    MockDevice mockDevice;
+
+    @Test
+    public void scrape() throws PlcConnectionException {
+        PlcDriverManager driverManager = new PlcDriverManager();
+        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
+        connection.setDevice(mockDevice);
+        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultStringFieldItem("hallo")));
+
+        ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
+            1_000, ForkJoinPool.commonPool());
+
+        scraperTask.run();
+    }
+
+    @Nested
+    class Exceptions {
+
+        @Test
+        public void badResponseCode_shouldHandleException() throws PlcConnectionException {
+            // Given
+            PlcDriverManager driverManager = new PlcDriverManager();
+            PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
+            connection.setDevice(mockDevice);
+            when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.NOT_FOUND, new DefaultStringFieldItem("hallo")));
+
+            ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1",
+                "mock:scraper", Collections.singletonMap("a", "b"), 1_000, ForkJoinPool.commonPool());
+
+            // When
+            scraperTask.run();
+        }
+
+        @Mock
+        PlcDriverManager driverManager;
+
+        @Test
+        public void handleConnectionException() throws PlcConnectionException {
+            // Given
+            when(driverManager.getConnection(anyString())).thenThrow(new PlcConnectionException("stfu"));
+
+            ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
+                1_000, ForkJoinPool.commonPool());
+
+            assertThatThrownBy(scraperTask::run)
+                .isInstanceOf(PlcRuntimeException.class)
+                .hasMessageContaining("Unable to fetch connection");
+        }
+
+        @Test
+        void runByScheduler_handledGracefully() throws PlcConnectionException {
+            when(driverManager.getConnection(anyString())).thenThrow(new PlcConnectionException("stfu"));
+            ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
+            ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
+                1_000, ForkJoinPool.commonPool());
+
+            Future<?> future = pool.scheduleAtFixedRate(scraperTask, 0, 10, TimeUnit.MILLISECONDS);
+
+            assertThat(future).isNotDone();
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
index 25b526a..3a25f34 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
@@ -20,57 +20,81 @@
 package org.apache.plc4x.java.scraper;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.java.base.messages.items.DefaultStringFieldItem;
+import org.apache.plc4x.java.base.messages.items.DefaultIntegerFieldItem;
 import org.apache.plc4x.java.mock.MockDevice;
 import org.apache.plc4x.java.mock.PlcMockConnection;
-import org.junit.Test;
-import org.mockito.Mockito;
+import org.apache.plc4x.java.utils.connectionpool.PoolKey;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcConnectionFactory;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
-public class ScraperTest {
+@ExtendWith(MockitoExtension.class)
+class ScraperTest {
 
-    @Test
-    public void scrape() throws PlcConnectionException {
-        PlcDriverManager driverManager = new PlcDriverManager();
-        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
-        MockDevice mockDevice = Mockito.mock(MockDevice.class);
-        connection.setDevice(mockDevice);
-        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultStringFieldItem("hallo")));
+    @Mock
+    MockDevice mockDevice;
 
-        Scraper scraper = new Scraper(driverManager, "mock:scraper", 1_000, new Scraper.ResultHandler() {
-            @Override
-            public void handle(Map<String, Object> result) {
-                System.out.println(result);
-            }
+    public static final String CONN_STRING_TIM = "s7://10.10.64.22/0/1";
+    public static final String FIELD_STRING_TIM = "%DB225:DBW0:INT";
+
+        public static final String CONN_STRING_CH = "s7://10.10.64.20/0/1";
+    public static final String FIELD_STRING_CH = "%DB3:DBD32:DINT";
 
+    @Test
+    void real_stuff() throws InterruptedException {
+        PlcDriverManager driverManager = new PooledPlcDriverManager(new PooledPlcDriverManager.PoolCreator() {
             @Override
-            public void handleException(Exception e) {
-                System.err.println(e);
+            public KeyedObjectPool<PoolKey, PlcConnection> createPool(PooledPlcConnectionFactory pooledPlcConnectionFactory) {
+                return null;
             }
         });
 
-        scraper.run();
+        Scraper scraper = new Scraper(driverManager, Arrays.asList(
+            new Scraper.ScrapeJob("job1",
+                10,
+                Collections.singletonMap("tim", CONN_STRING_TIM),
+                Collections.singletonMap("distance", FIELD_STRING_TIM)
+            ),
+            new Scraper.ScrapeJob("job2",
+                10,
+                Collections.singletonMap("chris", CONN_STRING_CH),
+                Collections.singletonMap("counter", FIELD_STRING_CH)
+            )
+        ));
+
+        Thread.sleep(300_000);
     }
 
     @Test
-    public void scrape_badResponseCode_shouldHandleException() throws PlcConnectionException {
+    void scraper_schedulesJob() throws InterruptedException, PlcConnectionException {
         PlcDriverManager driverManager = new PlcDriverManager();
-        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
-        MockDevice mockDevice = Mockito.mock(MockDevice.class);
+        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:m1");
         connection.setDevice(mockDevice);
-        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.NOT_FOUND, new DefaultStringFieldItem("hallo")));
 
-        Scraper.ResultHandler handler = Mockito.mock(Scraper.ResultHandler.class);
+        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultIntegerFieldItem(1)));
 
-        Scraper scraper = new Scraper(driverManager, "mock:scraper", 1_000, null);
+        Scraper scraper = new Scraper(driverManager, Collections.singletonList(
+            new Scraper.ScrapeJob("job1",
+                10,
+                Collections.singletonMap("m1", "mock:m1"),
+                Collections.singletonMap("field1", "qry1")
+            )
+        ));
 
-        scraper.run();
+        Thread.sleep(5_000);
     }
 }
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/resources/logback.xml b/plc4j/utils/scraper/src/test/resources/logback-test.xml
similarity index 97%
rename from plc4j/utils/scraper/src/test/resources/logback.xml
rename to plc4j/utils/scraper/src/test/resources/logback-test.xml
index f0f2b2e..c562020 100644
--- a/plc4j/utils/scraper/src/test/resources/logback.xml
+++ b/plc4j/utils/scraper/src/test/resources/logback-test.xml
@@ -29,7 +29,7 @@
     </encoder>
   </appender>
 
-  <root level="ERROR">
+  <root level="INFO">
     <appender-ref ref="STDOUT"/>
   </root>