You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2018/11/24 20:30:41 UTC

[incubator-plc4x] 02/02: [plc4j-scraper] Increased tests coverage for Scraper.java.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 1e3bfeeda94b26cd07e4e78c4f8bbc0bbe7b08a7
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 21:28:57 2018 +0100

    [plc4j-scraper] Increased tests coverage for Scraper.java.
---
 .../org/apache/plc4x/java/scraper/Scraper.java     | 51 +++++++++++---
 .../apache/plc4x/java/scraper/ScraperTaskTest.java |  7 +-
 .../org/apache/plc4x/java/scraper/ScraperTest.java | 79 ++++++++++++++++++++--
 3 files changed, 119 insertions(+), 18 deletions(-)

diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
index 90da56a..e1f9fc1 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
@@ -31,11 +31,11 @@ import org.apache.plc4x.java.PlcDriverManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.*;
 import java.util.stream.IntStream;
 
 /**
@@ -58,6 +58,7 @@ public class Scraper {
             .build()
     );
     private final MultiValuedMap<ScrapeJob, ScraperTask> tasks = new ArrayListValuedHashMap<>();
+    private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> futures = new ArrayListValuedHashMap<>();
     private final PlcDriverManager driverManager;
     private final List<ScrapeJob> jobs;
 
@@ -65,9 +66,14 @@ public class Scraper {
         Validate.notEmpty(jobs);
         this.driverManager = driverManager;
         this.jobs = jobs;
+    }
 
+    /**
+     * Start the scraping.
+     */
+    public void start() {
         // Schedule all jobs
-        LOGGER.info("Registering jobs...");
+        LOGGER.info("Starting jobs...");
         jobs.stream()
             .flatMap(job -> job.connections.entrySet().stream()
                 .map(entry -> Triple.of(job, entry.getKey(), entry.getValue()))
@@ -83,8 +89,11 @@ public class Scraper {
                         handlerPool);
                     // Add task to internal list
                     tasks.put(tuple.getLeft(), task);
-                    scheduler.scheduleAtFixedRate(task,
+                    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task,
                         0, tuple.getLeft().scrapeRate, TimeUnit.MILLISECONDS);
+
+                    // Store the handle for stopping, etc.
+                    futures.put(task, future);
                 }
             );
 
@@ -93,12 +102,34 @@ public class Scraper {
             for (Map.Entry<ScrapeJob, ScraperTask> entry : tasks.entries()) {
                 DescriptiveStatistics statistics = entry.getValue().getLatencyStatistics();
                 String msg = String.format(Locale.ENGLISH, "Job statistics (%s, %s) number of requests: %d (%d success, %.1f %% failed, %.1f %% too slow), mean latency: %.2f ms, median: %.2f ms",
-                    entry.getValue().getJobName(), entry.getValue().getConnectionAlias(), entry.getValue().getRequestCounter(), entry.getValue().getSuccessfullRequestCounter(), entry.getValue().getPercentageFailed(), statistics.apply(new PercentageAboveThreshold(entry.getKey().scrapeRate*1e6)), statistics.getMean()*1e-6, statistics.getPercentile(50)*1e-6);
+                    entry.getValue().getJobName(), entry.getValue().getConnectionAlias(), entry.getValue().getRequestCounter(), entry.getValue().getSuccessfullRequestCounter(), entry.getValue().getPercentageFailed(), statistics.apply(new PercentageAboveThreshold(entry.getKey().scrapeRate * 1e6)), statistics.getMean() * 1e-6, statistics.getPercentile(50) * 1e-6);
                 LOGGER.info(msg);
             }
         }, 1_000, 1_000, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * For testing.
+     */
+    ScheduledExecutorService getScheduler() {
+        return scheduler;
+    }
+
+    public int getNumberOfActiveTasks() {
+        return (int) futures.entries().stream().filter(entry -> !entry.getValue().isDone()).count();
+    }
+
+    public void stop() {
+        // Stop all futures
+        LOGGER.info("Stopping scraper...");
+        for (Map.Entry<ScraperTask, ScheduledFuture<?>> entry : futures.entries()) {
+            LOGGER.debug("Stopping task {}...", entry.getKey());
+            entry.getValue().cancel(true);
+        }
+        // Clear the map
+        futures.clear();
+    }
+
     public static class ScrapeJob {
 
         private final String name;
@@ -133,7 +164,7 @@ public class Scraper {
             long below = Arrays.stream(values)
                 .filter(val -> val <= threshold)
                 .count();
-            return (double)below/values.length;
+            return (double) below / values.length;
         }
 
         @Override
@@ -142,7 +173,7 @@ public class Scraper {
                 .mapToDouble(i -> values[i])
                 .filter(val -> val > threshold)
                 .count();
-            return 100.0*below/length;
+            return 100.0 * below / length;
         }
 
         @Override
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
index 546f3cb..77df940 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
@@ -90,9 +90,10 @@ public class ScraperTaskTest implements WithAssertions {
             ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
                 1_000, ForkJoinPool.commonPool());
 
-            assertThatThrownBy(scraperTask::run)
-                .isInstanceOf(PlcRuntimeException.class)
-                .hasMessageContaining("Unable to fetch connection");
+            ScraperTask spy = spy(scraperTask);
+            spy.run();
+
+            verify(spy).handleException(any());
         }
 
         @Test
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
index a3107ea..168825b 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
@@ -20,7 +20,6 @@
 package org.apache.plc4x.java.scraper;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 import org.apache.plc4x.java.PlcDriverManager;
@@ -30,9 +29,9 @@ import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.messages.items.DefaultIntegerFieldItem;
 import org.apache.plc4x.java.mock.MockDevice;
 import org.apache.plc4x.java.mock.PlcMockConnection;
-import org.apache.plc4x.java.utils.connectionpool.PoolKey;
-import org.apache.plc4x.java.utils.connectionpool.PooledPlcConnectionFactory;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -40,12 +39,13 @@ import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
-class ScraperTest {
+class ScraperTest implements WithAssertions {
 
     @Mock
     MockDevice mockDevice;
@@ -57,6 +57,7 @@ class ScraperTest {
     public static final String FIELD_STRING_CH = "%DB3:DBD32:DINT";
 
     @Test
+    @Disabled
     void real_stuff() throws InterruptedException {
         PlcDriverManager driverManager = new PooledPlcDriverManager(pooledPlcConnectionFactory -> {
             GenericKeyedObjectPoolConfig<PlcConnection> config = new GenericKeyedObjectPoolConfig<>();
@@ -102,6 +103,74 @@ class ScraperTest {
             )
         ));
 
-        Thread.sleep(5_000);
+        scraper.start();
+
+        Thread.sleep(1_000);
+
+        // Assert that tasks got done.
+        assertThat(scraper.getScheduler()).isInstanceOf(ScheduledThreadPoolExecutor.class);
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isEqualTo(2);
+        assertThat(((ScheduledThreadPoolExecutor) scraper.getScheduler()).getCompletedTaskCount())
+            .isGreaterThan(10);
+    }
+
+    @Test
+    void stop_stopsAllJobs() throws PlcConnectionException {
+        PlcDriverManager driverManager = new PlcDriverManager();
+        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:m1");
+        connection.setDevice(mockDevice);
+
+        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultIntegerFieldItem(1)));
+
+        Scraper scraper = new Scraper(driverManager, Collections.singletonList(
+            new Scraper.ScrapeJob("job1",
+                1,
+                Collections.singletonMap("m1", "mock:m1"),
+                Collections.singletonMap("field1", "qry1")
+            )
+        ));
+
+        scraper.start();
+
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isEqualTo(1);
+
+        scraper.stop();
+
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isZero();
+    }
+
+    @Test
+    void restart_works() throws PlcConnectionException {
+        PlcDriverManager driverManager = new PlcDriverManager();
+        PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:m1");
+        connection.setDevice(mockDevice);
+
+        when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultIntegerFieldItem(1)));
+
+        Scraper scraper = new Scraper(driverManager, Collections.singletonList(
+            new Scraper.ScrapeJob("job1",
+                1,
+                Collections.singletonMap("m1", "mock:m1"),
+                Collections.singletonMap("field1", "qry1")
+            )
+        ));
+
+        scraper.start();
+
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isEqualTo(1);
+
+        scraper.stop();
+
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isZero();
+
+        scraper.start();
+
+        assertThat(scraper.getNumberOfActiveTasks())
+            .isEqualTo(1);
     }
 }
\ No newline at end of file