You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2018/11/24 20:35:50 UTC

[incubator-plc4x] branch feature/plc4j-scraper updated (1e3bfee -> 78e1d31)

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a change to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


 discard 1e3bfee  [plc4j-scraper] Increased tests coverage for Scraper.java.
 discard 02fd056  [plc4x] enable global osgi manifest generation
 discard 258af1a  [plc4j-opm] Minor test coverage.
 discard 99d8f27  [plc4j-driver-simulated] added tests and fixed bugs.
 discard e8bf82c  [plc4j-driver-bases-test] enabled some tests.
 discard 58a00fa  [plc4j-driver-bases-test] added some tests.
 discard f93cf4e  [plc4j] cleanup mock code and moved it completely to driver-bases-test
 discard c4e4f88  [plc4j-opm] added some tests for entity interceptor
 discard 57b707b  [plc4j-scraper] Working state.
 discard 6b8bddf  [plc4j-scraper] Fix for S7 Connection. Further implementation.
 discard 2fcd8bd  [plc4j-scraper] Current state.
 discard c91a4cd  [plc4j-scraper] Added tests.
 discard 7da0521  [plc4j-opm] fixed tests
 discard eebf860  [plc4j-opm] fixed some sonar issues.
 discard 73f9b08  [plc4j-opm] deactivate caching by default and fixed test
 discard b16cc9b  [plc4j-opm] fixed issue with detached entity.
 discard 760c78e  [plc4j-pool] Fixed visibility of Interface.
 discard fafbc79  [OPM] Added OPM Documentation.
 discard f6c1572  [plc4j-opm] enable write support (PLC4X-70)
 discard 18f1fd8  [plc4j-scraper] Improvements.
 discard 0504815  [plc4j-scraper] Added a tet with "guarding" runnable that cancels tests and decreases performance massive.
 discard c49db19  [plc4j-scraper] Added another test + logback.
 discard eba8955  [plc4j-scraper] Added module, initial tests.
     add 6fa99c2  [plc4j-opm] enable write support (PLC4X-70)
     add 78c7562  [OPM] Added OPM Documentation.
     add abcda3b  [plc4j-pool] Fixed visibility of Interface.
     add 90b7f10  [plc4j-opm] fixed issue with detached entity.
     add 8013c09  [plc4j-opm] deactivate caching by default and fixed test
     add 70445ae  [plc4j-opm] fixed some sonar issues.
     add d1bcb67  [plc4j-opm] fixed tests
     add f29f791  [plc4j-opm] added some tests for entity interceptor
     add 11e4e8f  [plc4j] cleanup mock code and moved it completely to driver-bases-test
     add b7c907f  [plc4j-driver-bases-test] added some tests.
     add 2852c41  [plc4j-driver-bases-test] enabled some tests.
     add 33d5983  [plc4j-driver-simulated] added tests and fixed bugs.
     add 8955a23  [plc4j-opm] Minor test coverage.
     add 2b4dfb6  [plc4x] enable global osgi manifest generation
     new 1e2263f  [plc4j-scraper] Added module, initial tests.
     new 3fabc85  [plc4j-scraper] Added another test + logback.
     new 95cde8e  [plc4j-scraper] Added a tet with "guarding" runnable that cancels tests and decreases performance massive.
     new 57efa48  [plc4j-scraper] Improvements.
     new 79c06aa  [plc4j-scraper] Added tests.
     new 07886c2  [plc4j-scraper] Current state.
     new 1463e49  [plc4j-scraper] Fix for S7 Connection. Further implementation.
     new ad3a136  [plc4j-scraper] Working state.
     new 78e1d31  [plc4j-scraper] Increased tests coverage for Scraper.java.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (1e3bfee)
            \
             N -- N -- N   refs/heads/feature/plc4j-scraper (78e1d31)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[incubator-plc4x] 03/09: [plc4j-scraper] Added a tet with "guarding" runnable that cancels tests and decreases performance massive.

Posted by jf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 95cde8e8ea8c51de9ab54d38cd3bf989a3fdb848
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 10:03:57 2018 +0100

    [plc4j-scraper] Added a tet with "guarding" runnable that cancels tests and decreases performance massive.
---
 .../apache/plc4x/java/s7/ManualS7PlcDriverMT.java  | 60 +++++++++++++++++++---
 1 file changed, 53 insertions(+), 7 deletions(-)

diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
index 868b1b8..54770a7 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
@@ -22,6 +22,7 @@ package org.apache.plc4x.java.s7;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 import org.junit.Test;
@@ -34,11 +35,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class ManualS7PlcDriverMT {
 
-//    public static final String CONN_STRING = "s7://10.10.64.22/0/1";
-//    public static final String FIELD_STRING = "%DB225:DBW0:INT";
+    public static final String CONN_STRING = "s7://10.10.64.22/0/1";
+    public static final String FIELD_STRING = "%DB225:DBW0:INT";
 
-    public static final String CONN_STRING = "s7://10.10.64.20/0/1";
-    public static final String FIELD_STRING = "%DB3:DBD32:DINT";
+//    public static final String CONN_STRING = "s7://10.10.64.20/0/1";
+//    public static final String FIELD_STRING = "%DB3:DBD32:DINT";
 
     @Test
     public void simpleLoop() {
@@ -75,6 +76,50 @@ public class ManualS7PlcDriverMT {
         executorService.awaitTermination(100, TimeUnit.SECONDS);
     }
 
+    @Test
+    public void scheduledCancellingLoop() throws InterruptedException, PlcConnectionException {
+        PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
+        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
+        DescriptiveStatistics statistics = new DescriptiveStatistics();
+
+        final int period = 10;
+        int numberOfRuns = 1000;
+        AtomicInteger counter = new AtomicInteger(0);
+
+        // Warmup
+        plcDriverManager.getConnection(CONN_STRING);
+
+        Runnable iteration = new Runnable() {
+            @Override
+            public void run() {
+                System.out.println("Setting a request / guard...");
+                CompletableFuture<Double> requestFuture = CompletableFuture.supplyAsync(
+                    () -> ManualS7PlcDriverMT.this.runSingleRequest(plcDriverManager)
+                );
+                executorService.schedule(() -> {
+                    if (!requestFuture.isDone()) {
+                        requestFuture.cancel(true);
+                        System.err.println("Cancel a future!");
+                    } else {
+                        System.out.println("Request finished successfully");
+                        try {
+                            statistics.addValue(requestFuture.get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // do nothing...
+                        }
+                    }
+                    if (counter.getAndIncrement() >= numberOfRuns) {
+                        executorService.shutdown();
+                        ManualS7PlcDriverMT.this.printStatistics(statistics);
+                    }
+                }, period, TimeUnit.MILLISECONDS);
+            }
+        };
+
+        executorService.scheduleAtFixedRate(iteration, 0, period, TimeUnit.MILLISECONDS);
+        executorService.awaitTermination(100, TimeUnit.SECONDS);
+    }
+
     private double runSingleRequest(PlcDriverManager plcDriverManager) {
         long start = System.nanoTime();
         try (PlcConnection connection = plcDriverManager.getConnection(CONN_STRING)) {
@@ -89,12 +134,13 @@ public class ManualS7PlcDriverMT {
             e.printStackTrace();
         }
         long end = System.nanoTime();
-        return (double)end-start;
+        return (double) end - start;
     }
 
     private void printStatistics(DescriptiveStatistics statistics) {
-        System.out.println("Mean response time: " + TimeUnit.NANOSECONDS.toMillis((long)statistics.getMean()) + " ms");
-        System.out.println("Median response time: " + TimeUnit.NANOSECONDS.toMillis((long)statistics.getPercentile(50)) + " ms");
+        System.out.println("Number of responses: " + statistics.getN());
+        System.out.println("Mean response time: " + TimeUnit.NANOSECONDS.toMillis((long) statistics.getMean()) + " ms");
+        System.out.println("Median response time: " + TimeUnit.NANOSECONDS.toMillis((long) statistics.getPercentile(50)) + " ms");
         for (int i = 10; i <= 90; i += 10) {
             System.out.println(String.format(Locale.ENGLISH, "Percentile %3d %%: %5d ms", i, TimeUnit.NANOSECONDS.toMillis((long) statistics.getPercentile(i))));
         }


[incubator-plc4x] 04/09: [plc4j-scraper] Improvements.

Posted by jf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 57efa48226070eeb5b231d12de6975c2eb1ef549
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 10:11:44 2018 +0100

    [plc4j-scraper] Improvements.
---
 .../apache/plc4x/java/s7/ManualS7PlcDriverMT.java  | 28 +++++++++++++++-------
 plc4j/utils/scraper/src/test/resources/logback.xml |  2 +-
 2 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
index 54770a7..073c221 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
@@ -27,20 +27,23 @@ import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 import org.junit.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Locale;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
 
 public class ManualS7PlcDriverMT {
 
     public static final String CONN_STRING = "s7://10.10.64.22/0/1";
     public static final String FIELD_STRING = "%DB225:DBW0:INT";
 
+
 //    public static final String CONN_STRING = "s7://10.10.64.20/0/1";
 //    public static final String FIELD_STRING = "%DB3:DBD32:DINT";
-
     @Test
     public void simpleLoop() {
         PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
@@ -76,14 +79,23 @@ public class ManualS7PlcDriverMT {
         executorService.awaitTermination(100, TimeUnit.SECONDS);
     }
 
-    @Test
-    public void scheduledCancellingLoop() throws InterruptedException, PlcConnectionException {
+    private static Stream<Arguments> periodAndRus() {
+        return Stream.of(
+            Arguments.of(10, 100),
+            Arguments.of(10, 1000),
+            Arguments.of(100, 100),
+            Arguments.of(100, 1000)
+            );
+    }
+
+    @ParameterizedTest
+    @MethodSource("periodAndRus")
+    public void scheduledCancellingLoop(int period, int numberOfRuns) throws InterruptedException, PlcConnectionException {
+        System.out.println("Starting iteration with period " + period + " and " + numberOfRuns + " runs.");
         PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
         ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
         DescriptiveStatistics statistics = new DescriptiveStatistics();
 
-        final int period = 10;
-        int numberOfRuns = 1000;
         AtomicInteger counter = new AtomicInteger(0);
 
         // Warmup
@@ -92,16 +104,16 @@ public class ManualS7PlcDriverMT {
         Runnable iteration = new Runnable() {
             @Override
             public void run() {
-                System.out.println("Setting a request / guard...");
+//                System.out.println("Setting a request / guard...");
                 CompletableFuture<Double> requestFuture = CompletableFuture.supplyAsync(
                     () -> ManualS7PlcDriverMT.this.runSingleRequest(plcDriverManager)
                 );
                 executorService.schedule(() -> {
                     if (!requestFuture.isDone()) {
                         requestFuture.cancel(true);
-                        System.err.println("Cancel a future!");
+                        System.out.print("!");
                     } else {
-                        System.out.println("Request finished successfully");
+                        System.out.print(".");
                         try {
                             statistics.addValue(requestFuture.get());
                         } catch (InterruptedException | ExecutionException e) {
diff --git a/plc4j/utils/scraper/src/test/resources/logback.xml b/plc4j/utils/scraper/src/test/resources/logback.xml
index c562020..f0f2b2e 100644
--- a/plc4j/utils/scraper/src/test/resources/logback.xml
+++ b/plc4j/utils/scraper/src/test/resources/logback.xml
@@ -29,7 +29,7 @@
     </encoder>
   </appender>
 
-  <root level="INFO">
+  <root level="ERROR">
     <appender-ref ref="STDOUT"/>
   </root>
 


[incubator-plc4x] 06/09: [plc4j-scraper] Current state.

Posted by jf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 07886c2f6aa314e71235a948a3d3b03666067114
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 17:40:58 2018 +0100

    [plc4j-scraper] Current state.
---
 .../plc4x/java/s7/connection/S7PlcConnection.java  |   2 +-
 plc4j/utils/scraper/pom.xml                        |  15 +-
 .../org/apache/plc4x/java/scraper/Scraper.java     | 161 +++++++++++++------
 .../org/apache/plc4x/java/scraper/ScraperTask.java | 176 +++++++++++++++++++++
 .../apache/plc4x/java/scraper/ScraperTaskTest.java | 111 +++++++++++++
 .../org/apache/plc4x/java/scraper/ScraperTest.java |  78 +++++----
 .../resources/{logback.xml => logback-test.xml}    |   2 +-
 7 files changed, 466 insertions(+), 79 deletions(-)

diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
index a56228e..63609d5 100644
--- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
+++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
@@ -242,7 +242,7 @@ public class S7PlcConnection extends NettyPlcConnection implements PlcReader, Pl
             // If the remote didn't close the connection within the given time-frame, we have to take
             // care of closing the connection.
             catch (TimeoutException e) {
-                logger.info("Remote didn't close connection within the configured timeout of {}ms, shutting down actively.", CLOSE_DEVICE_TIMEOUT_MS, e);
+                logger.debug("Remote didn't close connection within the configured timeout of {} ms, shutting down actively.", CLOSE_DEVICE_TIMEOUT_MS, e);
                 channel.close();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
diff --git a/plc4j/utils/scraper/pom.xml b/plc4j/utils/scraper/pom.xml
index b88e8a2..9956414 100644
--- a/plc4j/utils/scraper/pom.xml
+++ b/plc4j/utils/scraper/pom.xml
@@ -49,7 +49,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-connectionString-pool</artifactId>
+      <artifactId>plc4j-connection-pool</artifactId>
       <version>0.3.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
@@ -59,6 +59,19 @@
       <version>0.3.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <!--TODO Remove this-->
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+    </dependency>
   </dependencies>
 
 
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
index dfd4801..90da56a 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
@@ -19,72 +19,135 @@
 
 package org.apache.plc4x.java.scraper;
 
+import org.apache.commons.collections4.MultiValuedMap;
+import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
+import org.apache.commons.lang3.Validate;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.commons.math3.exception.MathIllegalArgumentException;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math3.stat.descriptive.UnivariateStatistic;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
- * Plc Scraper that scrapes one source.
+ * Main class that orchestrates scraping.
  */
-public class Scraper implements Runnable {
+public class Scraper {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Scraper.class);
 
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10,
+        new BasicThreadFactory.Builder()
+            .namingPattern("scheduler-thread-%d")
+            .daemon(true)
+            .build()
+    );
+    private final ExecutorService handlerPool = Executors.newFixedThreadPool(4,
+        new BasicThreadFactory.Builder()
+            .namingPattern("handler-thread-%d")
+            .daemon(true)
+            .build()
+    );
+    private final MultiValuedMap<ScrapeJob, ScraperTask> tasks = new ArrayListValuedHashMap<>();
     private final PlcDriverManager driverManager;
-    private final String connectionString;
-    private final long requestTimeoutMs;
-    private final ResultHandler handler;
+    private final List<ScrapeJob> jobs;
 
-    public Scraper(PlcDriverManager driverManager, String connectionString, long requestTimeoutMs, ResultHandler handler) {
+    public Scraper(PlcDriverManager driverManager, List<ScrapeJob> jobs) {
+        Validate.notEmpty(jobs);
         this.driverManager = driverManager;
-        this.connectionString = connectionString;
-        this.requestTimeoutMs = requestTimeoutMs;
-        this.handler = handler;
-    }
+        this.jobs = jobs;
+
+        // Schedule all jobs
+        LOGGER.info("Registering jobs...");
+        jobs.stream()
+            .flatMap(job -> job.connections.entrySet().stream()
+                .map(entry -> Triple.of(job, entry.getKey(), entry.getValue()))
+            )
+            .forEach(
+                tuple -> {
+                    LOGGER.debug("Register task for job {} for conn {} ({}) at rate {} ms",
+                        tuple.getLeft().name, tuple.getMiddle(), tuple.getRight(), tuple.getLeft().scrapeRate);
+                    ScraperTask task = new ScraperTask(driverManager,
+                        tuple.getLeft().name, tuple.getMiddle(), tuple.getRight(),
+                        tuple.getLeft().fields,
+                        1_000,
+                        handlerPool);
+                    // Add task to internal list
+                    tasks.put(tuple.getLeft(), task);
+                    scheduler.scheduleAtFixedRate(task,
+                        0, tuple.getLeft().scrapeRate, TimeUnit.MILLISECONDS);
+                }
+            );
 
-    @Override
-    public void run() {
-        // Does a single fetch
-        try (PlcConnection connection = driverManager.getConnection(connectionString)) {
-            PlcReadResponse response;
-            try {
-                response = connection.readRequestBuilder()
-                    .addItem("item1", "add1")
-                    .build()
-                    .execute()
-                    .get(requestTimeoutMs, TimeUnit.MILLISECONDS);
-            } catch (ExecutionException e) {
-                // Handle execution exception
-                handler.handleException(e);
-                return;
+        // Add statistics tracker
+        scheduler.scheduleAtFixedRate(() -> {
+            for (Map.Entry<ScrapeJob, ScraperTask> entry : tasks.entries()) {
+                DescriptiveStatistics statistics = entry.getValue().getLatencyStatistics();
+                String msg = String.format(Locale.ENGLISH, "Job statistics (%s, %s) number of requests: %d (%d success, %.1f %% failed, %.1f %% too slow), mean latency: %.2f ms, median: %.2f ms",
+                    entry.getValue().getJobName(), entry.getValue().getConnectionAlias(), entry.getValue().getRequestCounter(), entry.getValue().getSuccessfullRequestCounter(), entry.getValue().getPercentageFailed(), statistics.apply(new PercentageAboveThreshold(entry.getKey().scrapeRate*1e6)), statistics.getMean()*1e-6, statistics.getPercentile(50)*1e-6);
+                LOGGER.info(msg);
             }
-            CompletableFuture.runAsync(() -> handler.handle(transformResponseToMap(response)));
-        } catch (PlcConnectionException e) {
-            throw new PlcRuntimeException("Unable to fetch", e);
-        } catch (Exception e) {
-            throw new PlcRuntimeException("Unexpected exception during fetch", e);
-        }
+        }, 1_000, 1_000, TimeUnit.MILLISECONDS);
     }
 
-    private Map<String, Object> transformResponseToMap(PlcReadResponse response) {
-        return response.getFieldNames().stream()
-            .collect(Collectors.toMap(
-                name -> name,
-                response::getObject
-            ));
+    public static class ScrapeJob {
+
+        private final String name;
+        private final long scrapeRate;
+        /**
+         * alias -> connection-string
+         */
+        private final Map<String, String> connections;
+        /**
+         * alias -> field-query
+         */
+        private final Map<String, String> fields;
+
+        public ScrapeJob(String name, long scrapeRate, Map<String, String> connections, Map<String, String> fields) {
+            this.name = name;
+            this.scrapeRate = scrapeRate;
+            this.connections = connections;
+            this.fields = fields;
+        }
     }
 
-    public interface ResultHandler {
+    private static class PercentageAboveThreshold implements UnivariateStatistic {
+
+        private final double threshold;
 
-        void handle(Map<String, Object> result);
+        public PercentageAboveThreshold(double threshold) {
+            this.threshold = threshold;
+        }
 
-        void handleException(Exception e);
+        @Override
+        public double evaluate(double[] values) throws MathIllegalArgumentException {
+            long below = Arrays.stream(values)
+                .filter(val -> val <= threshold)
+                .count();
+            return (double)below/values.length;
+        }
 
+        @Override
+        public double evaluate(double[] values, int begin, int length) throws MathIllegalArgumentException {
+            long below = IntStream.range(begin, length)
+                .mapToDouble(i -> values[i])
+                .filter(val -> val > threshold)
+                .count();
+            return 100.0*below/length;
+        }
+
+        @Override
+        public UnivariateStatistic copy() {
+            return new PercentageAboveThreshold(threshold);
+        }
     }
 }
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java
new file mode 100644
index 0000000..9bbf5e4
--- /dev/null
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Plc Scraper that scrapes one source.
+ */
+public class ScraperTask implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ScraperTask.class);
+
+    private final PlcDriverManager driverManager;
+    private final String jobName;
+    private final String connectionAlias;
+    private final String connectionString;
+    private final Map<String, String> fields;
+    private final long requestTimeoutMs;
+    private final ExecutorService handlerService;
+
+    private final AtomicLong requestCounter = new AtomicLong(0);
+    private final AtomicLong successCounter = new AtomicLong(0);
+    private final DescriptiveStatistics latencyStatistics = new DescriptiveStatistics(1000);
+    private final DescriptiveStatistics failedStatistics = new DescriptiveStatistics(1000);
+
+    public ScraperTask(PlcDriverManager driverManager, String jobName, String connectionAlias, String connectionString,
+                       Map<String, String> fields, long requestTimeoutMs, ExecutorService handlerService) {
+        Validate.notNull(driverManager);
+        Validate.notBlank(jobName);
+        Validate.notBlank(connectionAlias);
+        Validate.notBlank(connectionString);
+        Validate.notEmpty(fields);
+        Validate.isTrue(requestTimeoutMs > 0);
+        this.driverManager = driverManager;
+        this.jobName = jobName;
+        this.connectionAlias = connectionAlias;
+        this.connectionString = connectionString;
+        this.fields = fields;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.handlerService = handlerService;
+    }
+
+    @Override
+    public void run() {
+        // Does a single fetch
+        LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+        requestCounter.incrementAndGet();
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        try (PlcConnection connection = driverManager.getConnection(connectionString)) {
+            LOGGER.trace("Connection to {} established: {}", connectionString, connection);
+            PlcReadResponse response;
+            try {
+                PlcReadRequest.Builder builder = connection.readRequestBuilder();
+                fields.forEach((alias,qry) -> {
+                    LOGGER.trace("Requesting: {} -> {}", alias, qry);
+                    builder.addItem(alias,qry);
+                });
+                response = builder
+                    .build()
+                    .execute()
+                    .get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+            } catch (ExecutionException e) {
+                // Handle execution exception
+                handleException(e);
+                return;
+            }
+            // Add statistics
+            stopWatch.stop();
+            latencyStatistics.addValue(stopWatch.getNanoTime());
+            failedStatistics.addValue(0.0);
+            successCounter.incrementAndGet();
+            // Validate response
+            validateResponse(response);
+            // Handle response (Async)
+            CompletableFuture.runAsync(() -> handle(transformResponseToMap(response)), handlerService);
+        } catch (Exception e) {
+            failedStatistics.addValue(1.0);
+            LOGGER.debug("Exception during scrape", e);
+            handleException(e);
+        }
+    }
+
+    private void validateResponse(PlcReadResponse response) {
+        Map<String, PlcResponseCode> failedFields = response.getFieldNames().stream()
+            .filter(name -> !PlcResponseCode.OK.equals(response.getResponseCode(name)))
+            .collect(Collectors.toMap(
+                Function.identity(),
+                response::getResponseCode
+            ));
+        if (failedFields.size() > 0) {
+            handleErrorResponse(failedFields);
+        }
+    }
+
+    private Map<String, Object> transformResponseToMap(PlcReadResponse response) {
+        return response.getFieldNames().stream()
+            .collect(Collectors.toMap(
+                name -> name,
+                response::getObject
+            ));
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public String getConnectionAlias() {
+        return connectionAlias;
+    }
+
+    public long getRequestCounter() {
+        return requestCounter.get();
+    }
+
+    public long getSuccessfullRequestCounter() {
+        return successCounter.get();
+    }
+
+    public DescriptiveStatistics getLatencyStatistics() {
+        return latencyStatistics;
+    }
+
+    public double getPercentageFailed() {
+        return 100.0*failedStatistics.getMean();
+    }
+
+    public void handle(Map<String, Object> result) {
+        LOGGER.debug("Handling result on gorgeous pool: {}", result);
+    }
+
+    public void handleException(Exception e) {
+        failedStatistics.addValue(1.0);
+    }
+
+    public void handleErrorResponse(Map<String, PlcResponseCode> failed) {
+        LOGGER.warn("Handling error responses: {}", failed);
+    }
+
+}
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
new file mode 100644
index 0000000..546f3cb
--- /dev/null
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.items.DefaultStringFieldItem;
+import org.apache.plc4x.java.mock.MockDevice;
+import org.apache.plc4x.java.mock.PlcMockConnection;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+public class ScraperTaskTest implements WithAssertions {
+
+    @Mock
+    MockDevice mockDevice;
+
+    @Test
+    public void scrape() throws PlcConnectionException {
+        PlcDriverManager driverManager = new PlcDriverManager();
+        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
+        connection.setDevice(mockDevice);
+        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultStringFieldItem("hallo")));
+
+        ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
+            1_000, ForkJoinPool.commonPool());
+
+        scraperTask.run();
+    }
+
+    @Nested
+    class Exceptions {
+
+        @Test
+        public void badResponseCode_shouldHandleException() throws PlcConnectionException {
+            // Given
+            PlcDriverManager driverManager = new PlcDriverManager();
+            PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
+            connection.setDevice(mockDevice);
+            when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.NOT_FOUND, new DefaultStringFieldItem("hallo")));
+
+            ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1",
+                "mock:scraper", Collections.singletonMap("a", "b"), 1_000, ForkJoinPool.commonPool());
+
+            // When
+            scraperTask.run();
+        }
+
+        @Mock
+        PlcDriverManager driverManager;
+
+        @Test
+        public void handleConnectionException() throws PlcConnectionException {
+            // Given
+            when(driverManager.getConnection(anyString())).thenThrow(new PlcConnectionException("stfu"));
+
+            ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
+                1_000, ForkJoinPool.commonPool());
+
+            assertThatThrownBy(scraperTask::run)
+                .isInstanceOf(PlcRuntimeException.class)
+                .hasMessageContaining("Unable to fetch connection");
+        }
+
+        @Test
+        void runByScheduler_handledGracefully() throws PlcConnectionException {
+            when(driverManager.getConnection(anyString())).thenThrow(new PlcConnectionException("stfu"));
+            ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
+            ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
+                1_000, ForkJoinPool.commonPool());
+
+            Future<?> future = pool.scheduleAtFixedRate(scraperTask, 0, 10, TimeUnit.MILLISECONDS);
+
+            assertThat(future).isNotDone();
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
index 25b526a..3a25f34 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
@@ -20,57 +20,81 @@
 package org.apache.plc4x.java.scraper;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.java.base.messages.items.DefaultStringFieldItem;
+import org.apache.plc4x.java.base.messages.items.DefaultIntegerFieldItem;
 import org.apache.plc4x.java.mock.MockDevice;
 import org.apache.plc4x.java.mock.PlcMockConnection;
-import org.junit.Test;
-import org.mockito.Mockito;
+import org.apache.plc4x.java.utils.connectionpool.PoolKey;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcConnectionFactory;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
-public class ScraperTest {
+@ExtendWith(MockitoExtension.class)
+class ScraperTest {
 
-    @Test
-    public void scrape() throws PlcConnectionException {
-        PlcDriverManager driverManager = new PlcDriverManager();
-        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
-        MockDevice mockDevice = Mockito.mock(MockDevice.class);
-        connection.setDevice(mockDevice);
-        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultStringFieldItem("hallo")));
+    @Mock
+    MockDevice mockDevice;
 
-        Scraper scraper = new Scraper(driverManager, "mock:scraper", 1_000, new Scraper.ResultHandler() {
-            @Override
-            public void handle(Map<String, Object> result) {
-                System.out.println(result);
-            }
+    public static final String CONN_STRING_TIM = "s7://10.10.64.22/0/1";
+    public static final String FIELD_STRING_TIM = "%DB225:DBW0:INT";
+
+        public static final String CONN_STRING_CH = "s7://10.10.64.20/0/1";
+    public static final String FIELD_STRING_CH = "%DB3:DBD32:DINT";
 
+    @Test
+    void real_stuff() throws InterruptedException {
+        PlcDriverManager driverManager = new PooledPlcDriverManager(new PooledPlcDriverManager.PoolCreator() {
             @Override
-            public void handleException(Exception e) {
-                System.err.println(e);
+            public KeyedObjectPool<PoolKey, PlcConnection> createPool(PooledPlcConnectionFactory pooledPlcConnectionFactory) {
+                return null;
             }
         });
 
-        scraper.run();
+        Scraper scraper = new Scraper(driverManager, Arrays.asList(
+            new Scraper.ScrapeJob("job1",
+                10,
+                Collections.singletonMap("tim", CONN_STRING_TIM),
+                Collections.singletonMap("distance", FIELD_STRING_TIM)
+            ),
+            new Scraper.ScrapeJob("job2",
+                10,
+                Collections.singletonMap("chris", CONN_STRING_CH),
+                Collections.singletonMap("counter", FIELD_STRING_CH)
+            )
+        ));
+
+        Thread.sleep(300_000);
     }
 
     @Test
-    public void scrape_badResponseCode_shouldHandleException() throws PlcConnectionException {
+    void scraper_schedulesJob() throws InterruptedException, PlcConnectionException {
         PlcDriverManager driverManager = new PlcDriverManager();
-        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
-        MockDevice mockDevice = Mockito.mock(MockDevice.class);
+        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:m1");
         connection.setDevice(mockDevice);
-        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.NOT_FOUND, new DefaultStringFieldItem("hallo")));
 
-        Scraper.ResultHandler handler = Mockito.mock(Scraper.ResultHandler.class);
+        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultIntegerFieldItem(1)));
 
-        Scraper scraper = new Scraper(driverManager, "mock:scraper", 1_000, null);
+        Scraper scraper = new Scraper(driverManager, Collections.singletonList(
+            new Scraper.ScrapeJob("job1",
+                10,
+                Collections.singletonMap("m1", "mock:m1"),
+                Collections.singletonMap("field1", "qry1")
+            )
+        ));
 
-        scraper.run();
+        Thread.sleep(5_000);
     }
 }
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/resources/logback.xml b/plc4j/utils/scraper/src/test/resources/logback-test.xml
similarity index 97%
rename from plc4j/utils/scraper/src/test/resources/logback.xml
rename to plc4j/utils/scraper/src/test/resources/logback-test.xml
index f0f2b2e..c562020 100644
--- a/plc4j/utils/scraper/src/test/resources/logback.xml
+++ b/plc4j/utils/scraper/src/test/resources/logback-test.xml
@@ -29,7 +29,7 @@
     </encoder>
   </appender>
 
-  <root level="ERROR">
+  <root level="INFO">
     <appender-ref ref="STDOUT"/>
   </root>
 


[incubator-plc4x] 02/09: [plc4j-scraper] Added another test + logback.

Posted by jf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 3fabc85b4e341e14f6f8d13780070b938a98ed12
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 09:32:33 2018 +0100

    [plc4j-scraper] Added another test + logback.
---
 .../apache/plc4x/java/s7/ManualS7PlcDriverMT.java  | 68 ++++++++++++++--------
 plc4j/utils/scraper/src/test/resources/logback.xml | 36 ++++++++++++
 2 files changed, 80 insertions(+), 24 deletions(-)

diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
index af60176..868b1b8 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
@@ -25,20 +25,20 @@ import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 import org.junit.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
-import java.util.ArrayList;
 import java.util.Locale;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class ManualS7PlcDriverMT {
 
-    public static final String CONN_STRING = "s7://10.10.64.22/0/1";
-    public static final String FIELD_STRING = "%DB225:DBW0:INT";
+//    public static final String CONN_STRING = "s7://10.10.64.22/0/1";
+//    public static final String FIELD_STRING = "%DB225:DBW0:INT";
 
-//    public static final String CONN_STRING = "s7://10.10.64.20/0/1";
-//    public static final String FIELD_STRING = "%DB3:DBD32:DINT";
+    public static final String CONN_STRING = "s7://10.10.64.20/0/1";
+    public static final String FIELD_STRING = "%DB3:DBD32:DINT";
 
     @Test
     public void simpleLoop() {
@@ -46,30 +46,50 @@ public class ManualS7PlcDriverMT {
 
         DescriptiveStatistics statistics = new DescriptiveStatistics();
         for (int i = 1; i <= 1000; i++) {
+            double timeNs = runSingleRequest(plcDriverManager);
+            statistics.addValue(timeNs);
+        }
 
-            long start = System.nanoTime();
-            try (PlcConnection connection = plcDriverManager.getConnection(CONN_STRING)) {
-                CompletableFuture<? extends PlcReadResponse> future = connection.readRequestBuilder()
-                    .addItem("distance", FIELD_STRING)
-                    .build()
-                    .execute();
+        printStatistics(statistics);
+    }
 
-                PlcReadResponse response = future.get(10, TimeUnit.SECONDS);
+    @ParameterizedTest
+    @ValueSource(ints = {1, 5, 10, 20})
+    public void scheduledLoop(int period) throws InterruptedException {
+        PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
+        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
+        DescriptiveStatistics statistics = new DescriptiveStatistics();
 
-                System.out.println(i + " " + response.getLong("distance"));
-            } catch (Exception e) {
-                e.printStackTrace();
+        int numberOfRuns = 1000;
+        AtomicInteger counter = new AtomicInteger(0);
+        executorService.scheduleAtFixedRate(() -> {
+            // System.out.println("Run: " + counter.get());
+            double timeNs = runSingleRequest(plcDriverManager);
+            statistics.addValue(timeNs);
+            if (counter.getAndIncrement() >= numberOfRuns) {
+                executorService.shutdown();
+                printStatistics(statistics);
             }
-            long end = System.nanoTime();
-            statistics.addValue((double)(end-start));
-        }
+        }, 0, period, TimeUnit.MILLISECONDS);
 
-        printStatistics(statistics);
+        executorService.awaitTermination(100, TimeUnit.SECONDS);
     }
 
-    @Test
-    public void scheduledLoop() {
+    private double runSingleRequest(PlcDriverManager plcDriverManager) {
+        long start = System.nanoTime();
+        try (PlcConnection connection = plcDriverManager.getConnection(CONN_STRING)) {
+            CompletableFuture<? extends PlcReadResponse> future = connection.readRequestBuilder()
+                .addItem("distance", FIELD_STRING)
+                .build()
+                .execute();
 
+            PlcReadResponse response = future.get(10, TimeUnit.SECONDS);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        long end = System.nanoTime();
+        return (double)end-start;
     }
 
     private void printStatistics(DescriptiveStatistics statistics) {
diff --git a/plc4j/utils/scraper/src/test/resources/logback.xml b/plc4j/utils/scraper/src/test/resources/logback.xml
new file mode 100644
index 0000000..c562020
--- /dev/null
+++ b/plc4j/utils/scraper/src/test/resources/logback.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<configuration xmlns="http://ch.qos.logback/xml/ns/logback"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback https://raw.githubusercontent.com/enricopulatzo/logback-XSD/master/src/main/xsd/logback.xsd">
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <!-- encoders are assigned the type
+         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="INFO">
+    <appender-ref ref="STDOUT"/>
+  </root>
+
+</configuration>
\ No newline at end of file


[incubator-plc4x] 09/09: [plc4j-scraper] Increased tests coverage for Scraper.java.

Posted by jf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 78e1d315b0751692cbe19e476d81034729d62d30
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 21:28:57 2018 +0100

    [plc4j-scraper] Increased tests coverage for Scraper.java.
---
 .../org/apache/plc4x/java/scraper/Scraper.java     | 51 +++++++++++---
 .../apache/plc4x/java/scraper/ScraperTaskTest.java |  7 +-
 .../org/apache/plc4x/java/scraper/ScraperTest.java | 79 ++++++++++++++++++++--
 3 files changed, 119 insertions(+), 18 deletions(-)

diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
index 90da56a..e1f9fc1 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
@@ -31,11 +31,11 @@ import org.apache.plc4x.java.PlcDriverManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.*;
 import java.util.stream.IntStream;
 
 /**
@@ -58,6 +58,7 @@ public class Scraper {
             .build()
     );
     private final MultiValuedMap<ScrapeJob, ScraperTask> tasks = new ArrayListValuedHashMap<>();
+    private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> futures = new ArrayListValuedHashMap<>();
     private final PlcDriverManager driverManager;
     private final List<ScrapeJob> jobs;
 
@@ -65,9 +66,14 @@ public class Scraper {
         Validate.notEmpty(jobs);
         this.driverManager = driverManager;
         this.jobs = jobs;
+    }
 
+    /**
+     * Start the scraping.
+     */
+    public void start() {
         // Schedule all jobs
-        LOGGER.info("Registering jobs...");
+        LOGGER.info("Starting jobs...");
         jobs.stream()
             .flatMap(job -> job.connections.entrySet().stream()
                 .map(entry -> Triple.of(job, entry.getKey(), entry.getValue()))
@@ -83,8 +89,11 @@ public class Scraper {
                         handlerPool);
                     // Add task to internal list
                     tasks.put(tuple.getLeft(), task);
-                    scheduler.scheduleAtFixedRate(task,
+                    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task,
                         0, tuple.getLeft().scrapeRate, TimeUnit.MILLISECONDS);
+
+                    // Store the handle for stopping, etc.
+                    futures.put(task, future);
                 }
             );
 
@@ -93,12 +102,34 @@ public class Scraper {
             for (Map.Entry<ScrapeJob, ScraperTask> entry : tasks.entries()) {
                 DescriptiveStatistics statistics = entry.getValue().getLatencyStatistics();
                 String msg = String.format(Locale.ENGLISH, "Job statistics (%s, %s) number of requests: %d (%d success, %.1f %% failed, %.1f %% too slow), mean latency: %.2f ms, median: %.2f ms",
-                    entry.getValue().getJobName(), entry.getValue().getConnectionAlias(), entry.getValue().getRequestCounter(), entry.getValue().getSuccessfullRequestCounter(), entry.getValue().getPercentageFailed(), statistics.apply(new PercentageAboveThreshold(entry.getKey().scrapeRate*1e6)), statistics.getMean()*1e-6, statistics.getPercentile(50)*1e-6);
+                    entry.getValue().getJobName(), entry.getValue().getConnectionAlias(), entry.getValue().getRequestCounter(), entry.getValue().getSuccessfullRequestCounter(), entry.getValue().getPercentageFailed(), statistics.apply(new PercentageAboveThreshold(entry.getKey().scrapeRate * 1e6)), statistics.getMean() * 1e-6, statistics.getPercentile(50) * 1e-6);
                 LOGGER.info(msg);
             }
         }, 1_000, 1_000, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * For testing.
+     */
+    ScheduledExecutorService getScheduler() {
+        return scheduler;
+    }
+
+    public int getNumberOfActiveTasks() {
+        return (int) futures.entries().stream().filter(entry -> !entry.getValue().isDone()).count();
+    }
+
+    public void stop() {
+        // Stop all futures
+        LOGGER.info("Stopping scraper...");
+        for (Map.Entry<ScraperTask, ScheduledFuture<?>> entry : futures.entries()) {
+            LOGGER.debug("Stopping task {}...", entry.getKey());
+            entry.getValue().cancel(true);
+        }
+        // Clear the map
+        futures.clear();
+    }
+
     public static class ScrapeJob {
 
         private final String name;
@@ -133,7 +164,7 @@ public class Scraper {
             long below = Arrays.stream(values)
                 .filter(val -> val <= threshold)
                 .count();
-            return (double)below/values.length;
+            return (double) below / values.length;
         }
 
         @Override
@@ -142,7 +173,7 @@ public class Scraper {
                 .mapToDouble(i -> values[i])
                 .filter(val -> val > threshold)
                 .count();
-            return 100.0*below/length;
+            return 100.0 * below / length;
         }
 
         @Override
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
index 546f3cb..77df940 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
@@ -90,9 +90,10 @@ public class ScraperTaskTest implements WithAssertions {
             ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
                 1_000, ForkJoinPool.commonPool());
 
-            assertThatThrownBy(scraperTask::run)
-                .isInstanceOf(PlcRuntimeException.class)
-                .hasMessageContaining("Unable to fetch connection");
+            ScraperTask spy = spy(scraperTask);
+            spy.run();
+
+            verify(spy).handleException(any());
         }
 
         @Test
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
index a3107ea..168825b 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
@@ -20,7 +20,6 @@
 package org.apache.plc4x.java.scraper;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 import org.apache.plc4x.java.PlcDriverManager;
@@ -30,9 +29,9 @@ import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.messages.items.DefaultIntegerFieldItem;
 import org.apache.plc4x.java.mock.MockDevice;
 import org.apache.plc4x.java.mock.PlcMockConnection;
-import org.apache.plc4x.java.utils.connectionpool.PoolKey;
-import org.apache.plc4x.java.utils.connectionpool.PooledPlcConnectionFactory;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -40,12 +39,13 @@ import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
-class ScraperTest {
+class ScraperTest implements WithAssertions {
 
     @Mock
     MockDevice mockDevice;
@@ -57,6 +57,7 @@ class ScraperTest {
     public static final String FIELD_STRING_CH = "%DB3:DBD32:DINT";
 
     @Test
+    @Disabled
     void real_stuff() throws InterruptedException {
         PlcDriverManager driverManager = new PooledPlcDriverManager(pooledPlcConnectionFactory -> {
             GenericKeyedObjectPoolConfig<PlcConnection> config = new GenericKeyedObjectPoolConfig<>();
@@ -102,6 +103,74 @@ class ScraperTest {
             )
         ));
 
-        Thread.sleep(5_000);
+        scraper.start();
+
+        Thread.sleep(1_000);
+
+        // Assert that tasks got done.
+        assertThat(scraper.getScheduler()).isInstanceOf(ScheduledThreadPoolExecutor.class);
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isEqualTo(2);
+        assertThat(((ScheduledThreadPoolExecutor) scraper.getScheduler()).getCompletedTaskCount())
+            .isGreaterThan(10);
+    }
+
+    @Test
+    void stop_stopsAllJobs() throws PlcConnectionException {
+        PlcDriverManager driverManager = new PlcDriverManager();
+        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:m1");
+        connection.setDevice(mockDevice);
+
+        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultIntegerFieldItem(1)));
+
+        Scraper scraper = new Scraper(driverManager, Collections.singletonList(
+            new Scraper.ScrapeJob("job1",
+                1,
+                Collections.singletonMap("m1", "mock:m1"),
+                Collections.singletonMap("field1", "qry1")
+            )
+        ));
+
+        scraper.start();
+
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isEqualTo(1);
+
+        scraper.stop();
+
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isZero();
+    }
+
+    @Test
+    void restart_works() throws PlcConnectionException {
+        PlcDriverManager driverManager = new PlcDriverManager();
+        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:m1");
+        connection.setDevice(mockDevice);
+
+        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultIntegerFieldItem(1)));
+
+        Scraper scraper = new Scraper(driverManager, Collections.singletonList(
+            new Scraper.ScrapeJob("job1",
+                1,
+                Collections.singletonMap("m1", "mock:m1"),
+                Collections.singletonMap("field1", "qry1")
+            )
+        ));
+
+        scraper.start();
+
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isEqualTo(1);
+
+        scraper.stop();
+
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isZero();
+
+        scraper.start();
+
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isEqualTo(1);
     }
 }
\ No newline at end of file


[incubator-plc4x] 05/09: [plc4j-scraper] Added tests.

Posted by jf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 79c06aa90f785689d19a656c6b577be4f9311320
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 13:14:44 2018 +0100

    [plc4j-scraper] Added tests.
---
 plc4j/utils/scraper/pom.xml                        |  8 +-
 .../org/apache/plc4x/java/scraper/Scraper.java     | 90 ++++++++++++++++++++++
 .../apache/plc4x/java/s7/ManualS7PlcDriverMT.java  | 52 +++++++++++--
 .../org/apache/plc4x/java/scraper/ScraperTest.java | 76 ++++++++++++++++++
 4 files changed, 220 insertions(+), 6 deletions(-)

diff --git a/plc4j/utils/scraper/pom.xml b/plc4j/utils/scraper/pom.xml
index 9b2e372..b88e8a2 100644
--- a/plc4j/utils/scraper/pom.xml
+++ b/plc4j/utils/scraper/pom.xml
@@ -49,7 +49,13 @@
     </dependency>
     <dependency>
       <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-connection-pool</artifactId>
+      <artifactId>plc4j-connectionString-pool</artifactId>
+      <version>0.3.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-protocol-test</artifactId>
       <version>0.3.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
new file mode 100644
index 0000000..dfd4801
--- /dev/null
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper;
+
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Plc Scraper that scrapes one source.
+ */
+public class Scraper implements Runnable {
+
+    private final PlcDriverManager driverManager;
+    private final String connectionString;
+    private final long requestTimeoutMs;
+    private final ResultHandler handler;
+
+    public Scraper(PlcDriverManager driverManager, String connectionString, long requestTimeoutMs, ResultHandler handler) {
+        this.driverManager = driverManager;
+        this.connectionString = connectionString;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.handler = handler;
+    }
+
+    @Override
+    public void run() {
+        // Does a single fetch
+        try (PlcConnection connection = driverManager.getConnection(connectionString)) {
+            PlcReadResponse response;
+            try {
+                response = connection.readRequestBuilder()
+                    .addItem("item1", "add1")
+                    .build()
+                    .execute()
+                    .get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+            } catch (ExecutionException e) {
+                // Handle execution exception
+                handler.handleException(e);
+                return;
+            }
+            CompletableFuture.runAsync(() -> handler.handle(transformResponseToMap(response)));
+        } catch (PlcConnectionException e) {
+            throw new PlcRuntimeException("Unable to fetch", e);
+        } catch (Exception e) {
+            throw new PlcRuntimeException("Unexpected exception during fetch", e);
+        }
+    }
+
+    private Map<String, Object> transformResponseToMap(PlcReadResponse response) {
+        return response.getFieldNames().stream()
+            .collect(Collectors.toMap(
+                name -> name,
+                response::getObject
+            ));
+    }
+
+    public interface ResultHandler {
+
+        void handle(Map<String, Object> result);
+
+        void handleException(Exception e);
+
+    }
+}
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
index 073c221..8822545 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
@@ -25,7 +25,8 @@ import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
-import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -36,14 +37,18 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 
+/**
+ * Manual Test.
+ */
+@Disabled
 public class ManualS7PlcDriverMT {
 
     public static final String CONN_STRING = "s7://10.10.64.22/0/1";
     public static final String FIELD_STRING = "%DB225:DBW0:INT";
 
-
-//    public static final String CONN_STRING = "s7://10.10.64.20/0/1";
+    //    public static final String CONN_STRING = "s7://10.10.64.20/0/1";
 //    public static final String FIELD_STRING = "%DB3:DBD32:DINT";
+
     @Test
     public void simpleLoop() {
         PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
@@ -79,13 +84,48 @@ public class ManualS7PlcDriverMT {
         executorService.awaitTermination(100, TimeUnit.SECONDS);
     }
 
+    @Test
+    public void parallelScheduledLoop() throws InterruptedException {
+        int period = 5;
+        PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
+        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
+        DescriptiveStatistics statistics1 = new DescriptiveStatistics();
+        DescriptiveStatistics statistics2 = new DescriptiveStatistics();
+
+        int numberOfRuns = 1000;
+        AtomicInteger counter1 = new AtomicInteger(0);
+        AtomicInteger counter2 = new AtomicInteger(0);
+        executorService.scheduleAtFixedRate(() -> {
+            // System.out.println("Run: " + counter.get());
+            double timeNs = runSingleRequest(plcDriverManager);
+            statistics1.addValue(timeNs);
+            if (counter1.getAndIncrement() >= numberOfRuns) {
+                executorService.shutdown();
+            }
+        }, 0, period, TimeUnit.MILLISECONDS);
+        executorService.scheduleAtFixedRate(() -> {
+            // System.out.println("Run: " + counter.get());
+            double timeNs = runSingleRequest(plcDriverManager);
+            statistics2.addValue(timeNs);
+            if (counter2.getAndIncrement() >= numberOfRuns) {
+                executorService.shutdown();
+            }
+        }, 0, period, TimeUnit.MILLISECONDS);
+
+        executorService.awaitTermination(100, TimeUnit.SECONDS);
+        System.out.println("Statistics 1");
+        printStatistics(statistics1);
+        System.out.println("Statistics 2");
+        printStatistics(statistics2);
+    }
+
     private static Stream<Arguments> periodAndRus() {
         return Stream.of(
             Arguments.of(10, 100),
             Arguments.of(10, 1000),
             Arguments.of(100, 100),
             Arguments.of(100, 1000)
-            );
+        );
     }
 
     @ParameterizedTest
@@ -122,7 +162,6 @@ public class ManualS7PlcDriverMT {
                     }
                     if (counter.getAndIncrement() >= numberOfRuns) {
                         executorService.shutdown();
-                        ManualS7PlcDriverMT.this.printStatistics(statistics);
                     }
                 }, period, TimeUnit.MILLISECONDS);
             }
@@ -130,11 +169,14 @@ public class ManualS7PlcDriverMT {
 
         executorService.scheduleAtFixedRate(iteration, 0, period, TimeUnit.MILLISECONDS);
         executorService.awaitTermination(100, TimeUnit.SECONDS);
+        // Print statistics
+        ManualS7PlcDriverMT.this.printStatistics(statistics);
     }
 
     private double runSingleRequest(PlcDriverManager plcDriverManager) {
         long start = System.nanoTime();
         try (PlcConnection connection = plcDriverManager.getConnection(CONN_STRING)) {
+            System.out.println("Connection: " + connection);
             CompletableFuture<? extends PlcReadResponse> future = connection.readRequestBuilder()
                 .addItem("distance", FIELD_STRING)
                 .build()
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
new file mode 100644
index 0000000..25b526a
--- /dev/null
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.items.DefaultStringFieldItem;
+import org.apache.plc4x.java.mock.MockDevice;
+import org.apache.plc4x.java.mock.PlcMockConnection;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+public class ScraperTest {
+
+    @Test
+    public void scrape() throws PlcConnectionException {
+        PlcDriverManager driverManager = new PlcDriverManager();
+        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
+        MockDevice mockDevice = Mockito.mock(MockDevice.class);
+        connection.setDevice(mockDevice);
+        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultStringFieldItem("hallo")));
+
+        Scraper scraper = new Scraper(driverManager, "mock:scraper", 1_000, new Scraper.ResultHandler() {
+            @Override
+            public void handle(Map<String, Object> result) {
+                System.out.println(result);
+            }
+
+            @Override
+            public void handleException(Exception e) {
+                System.err.println(e);
+            }
+        });
+
+        scraper.run();
+    }
+
+    @Test
+    public void scrape_badResponseCode_shouldHandleException() throws PlcConnectionException {
+        PlcDriverManager driverManager = new PlcDriverManager();
+        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
+        MockDevice mockDevice = Mockito.mock(MockDevice.class);
+        connection.setDevice(mockDevice);
+        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.NOT_FOUND, new DefaultStringFieldItem("hallo")));
+
+        Scraper.ResultHandler handler = Mockito.mock(Scraper.ResultHandler.class);
+
+        Scraper scraper = new Scraper(driverManager, "mock:scraper", 1_000, null);
+
+        scraper.run();
+    }
+}
\ No newline at end of file


[incubator-plc4x] 08/09: [plc4j-scraper] Working state.

Posted by jf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit ad3a13605c962ad9eb1e547c7392e156997eb211
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 19:35:45 2018 +0100

    [plc4j-scraper] Working state.
---
 .../src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java       | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
index bb33c1a..a3107ea 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
@@ -60,6 +60,7 @@ class ScraperTest {
     void real_stuff() throws InterruptedException {
         PlcDriverManager driverManager = new PooledPlcDriverManager(pooledPlcConnectionFactory -> {
             GenericKeyedObjectPoolConfig<PlcConnection> config = new GenericKeyedObjectPoolConfig<>();
+            config.setJmxEnabled(true);
             config.setMaxWaitMillis(-1);
             config.setMaxTotal(3);
             config.setMinIdlePerKey(0);
@@ -82,7 +83,7 @@ class ScraperTest {
             )
         ));
 
-        Thread.sleep(300_000);
+        Thread.sleep(30_000_000);
     }
 
     @Test


[incubator-plc4x] 07/09: [plc4j-scraper] Fix for S7 Connection. Further implementation.

Posted by jf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 1463e49c3bf63672f4c09e4ed508d406785957bc
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 18:45:39 2018 +0100

    [plc4j-scraper] Fix for S7 Connection. Further implementation.
---
 .../java/base/connection/NettyPlcConnection.java   |  2 +-
 .../org/apache/plc4x/java/scraper/ScraperTask.java | 23 ++++++++++++++++++++--
 .../org/apache/plc4x/java/scraper/ScraperTest.java | 18 +++++++++++------
 3 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/NettyPlcConnection.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/NettyPlcConnection.java
index 2f992b1..fd1ac78 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/NettyPlcConnection.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/NettyPlcConnection.java
@@ -97,7 +97,7 @@ public abstract class NettyPlcConnection extends AbstractPlcConnection {
 
     @Override
     public boolean isConnected() {
-        return connected;
+        return connected && channel.isActive();
     }
 
     public Channel getChannel() {
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java
index 9bbf5e4..b762c7a 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java
@@ -24,6 +24,8 @@ import org.apache.commons.lang3.time.StopWatch;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
@@ -83,7 +85,16 @@ public class ScraperTask implements Runnable {
         requestCounter.incrementAndGet();
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
-        try (PlcConnection connection = driverManager.getConnection(connectionString)) {
+        PlcConnection connection = null;
+        try {
+            CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
+                try {
+                    return driverManager.getConnection(connectionString);
+                } catch (PlcConnectionException e) {
+                    throw new PlcRuntimeException(e);
+                }
+            }, handlerService);
+            connection = future.get(10*requestTimeoutMs, TimeUnit.MILLISECONDS);
             LOGGER.trace("Connection to {} established: {}", connectionString, connection);
             PlcReadResponse response;
             try {
@@ -111,9 +122,16 @@ public class ScraperTask implements Runnable {
             // Handle response (Async)
             CompletableFuture.runAsync(() -> handle(transformResponseToMap(response)), handlerService);
         } catch (Exception e) {
-            failedStatistics.addValue(1.0);
             LOGGER.debug("Exception during scrape", e);
             handleException(e);
+        } finally {
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (Exception e) {
+                    // intentionally do nothing
+                }
+            }
         }
     }
 
@@ -166,6 +184,7 @@ public class ScraperTask implements Runnable {
     }
 
     public void handleException(Exception e) {
+        LOGGER.debug("Exception: ", e);
         failedStatistics.addValue(1.0);
     }
 
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
index 3a25f34..bb33c1a 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
@@ -21,6 +21,8 @@ package org.apache.plc4x.java.scraper;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@ -51,16 +53,20 @@ class ScraperTest {
     public static final String CONN_STRING_TIM = "s7://10.10.64.22/0/1";
     public static final String FIELD_STRING_TIM = "%DB225:DBW0:INT";
 
-        public static final String CONN_STRING_CH = "s7://10.10.64.20/0/1";
+    public static final String CONN_STRING_CH = "s7://10.10.64.20/0/1";
     public static final String FIELD_STRING_CH = "%DB3:DBD32:DINT";
 
     @Test
     void real_stuff() throws InterruptedException {
-        PlcDriverManager driverManager = new PooledPlcDriverManager(new PooledPlcDriverManager.PoolCreator() {
-            @Override
-            public KeyedObjectPool<PoolKey, PlcConnection> createPool(PooledPlcConnectionFactory pooledPlcConnectionFactory) {
-                return null;
-            }
+        PlcDriverManager driverManager = new PooledPlcDriverManager(pooledPlcConnectionFactory -> {
+            GenericKeyedObjectPoolConfig<PlcConnection> config = new GenericKeyedObjectPoolConfig<>();
+            config.setMaxWaitMillis(-1);
+            config.setMaxTotal(3);
+            config.setMinIdlePerKey(0);
+            config.setBlockWhenExhausted(true);
+            config.setTestOnBorrow(true);
+            config.setTestOnReturn(true);
+            return new GenericKeyedObjectPool<>(pooledPlcConnectionFactory, config);
         });
 
         Scraper scraper = new Scraper(driverManager, Arrays.asList(


[incubator-plc4x] 01/09: [plc4j-scraper] Added module, initial tests.

Posted by jf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 1e2263f25f9b660d2a599f72eae29255cc2b7792
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 09:09:34 2018 +0100

    [plc4j-scraper] Added module, initial tests.
---
 plc4j/utils/pom.xml                                |  1 +
 plc4j/utils/scraper/pom.xml                        | 59 +++++++++++++++
 .../apache/plc4x/java/s7/ManualS7PlcDriverMT.java  | 85 ++++++++++++++++++++++
 3 files changed, 145 insertions(+)

diff --git a/plc4j/utils/pom.xml b/plc4j/utils/pom.xml
index c9f3d82..f511bd8 100644
--- a/plc4j/utils/pom.xml
+++ b/plc4j/utils/pom.xml
@@ -39,6 +39,7 @@
     <module>raw-sockets</module>
     <module>test-utils</module>
     <module>wireshark-utils</module>
+    <module>scraper</module>
   </modules>
 
 </project>
\ No newline at end of file
diff --git a/plc4j/utils/scraper/pom.xml b/plc4j/utils/scraper/pom.xml
new file mode 100644
index 0000000..9b2e372
--- /dev/null
+++ b/plc4j/utils/scraper/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>plc4j-utils</artifactId>
+    <groupId>org.apache.plc4x</groupId>
+    <version>0.3.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>plc4j-scraper</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-math3</artifactId>
+      <version>3.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-api</artifactId>
+      <version>0.3.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-driver-s7</artifactId>
+      <version>0.3.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-connection-pool</artifactId>
+      <version>0.3.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
new file mode 100644
index 0000000..af60176
--- /dev/null
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/s7/ManualS7PlcDriverMT.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.s7;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Locale;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ManualS7PlcDriverMT {
+
+    public static final String CONN_STRING = "s7://10.10.64.22/0/1";
+    public static final String FIELD_STRING = "%DB225:DBW0:INT";
+
+//    public static final String CONN_STRING = "s7://10.10.64.20/0/1";
+//    public static final String FIELD_STRING = "%DB3:DBD32:DINT";
+
+    @Test
+    public void simpleLoop() {
+        PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
+
+        DescriptiveStatistics statistics = new DescriptiveStatistics();
+        for (int i = 1; i <= 1000; i++) {
+
+            long start = System.nanoTime();
+            try (PlcConnection connection = plcDriverManager.getConnection(CONN_STRING)) {
+                CompletableFuture<? extends PlcReadResponse> future = connection.readRequestBuilder()
+                    .addItem("distance", FIELD_STRING)
+                    .build()
+                    .execute();
+
+                PlcReadResponse response = future.get(10, TimeUnit.SECONDS);
+
+                System.out.println(i + " " + response.getLong("distance"));
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            long end = System.nanoTime();
+            statistics.addValue((double)(end-start));
+        }
+
+        printStatistics(statistics);
+    }
+
+    @Test
+    public void scheduledLoop() {
+
+    }
+
+    private void printStatistics(DescriptiveStatistics statistics) {
+        System.out.println("Mean response time: " + TimeUnit.NANOSECONDS.toMillis((long)statistics.getMean()) + " ms");
+        System.out.println("Median response time: " + TimeUnit.NANOSECONDS.toMillis((long)statistics.getPercentile(50)) + " ms");
+        for (int i = 10; i <= 90; i += 10) {
+            System.out.println(String.format(Locale.ENGLISH, "Percentile %3d %%: %5d ms", i, TimeUnit.NANOSECONDS.toMillis((long) statistics.getPercentile(i))));
+        }
+        for (int i = 91; i <= 100; i++) {
+            System.out.println(String.format(Locale.ENGLISH, "Percentile %3d %%: %5d ms", i, TimeUnit.NANOSECONDS.toMillis((long) statistics.getPercentile(i))));
+        }
+    }
+}