You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2018/11/24 20:30:41 UTC
[incubator-plc4x] 02/02: [plc4j-scraper] Increased tests coverage
for Scraper.java.
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 1e3bfeeda94b26cd07e4e78c4f8bbc0bbe7b08a7
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 21:28:57 2018 +0100
[plc4j-scraper] Increased tests coverage for Scraper.java.
---
.../org/apache/plc4x/java/scraper/Scraper.java | 51 +++++++++++---
.../apache/plc4x/java/scraper/ScraperTaskTest.java | 7 +-
.../org/apache/plc4x/java/scraper/ScraperTest.java | 79 ++++++++++++++++++++--
3 files changed, 119 insertions(+), 18 deletions(-)
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
index 90da56a..e1f9fc1 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
@@ -31,11 +31,11 @@ import org.apache.plc4x.java.PlcDriverManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.*;
import java.util.stream.IntStream;
/**
@@ -58,6 +58,7 @@ public class Scraper {
.build()
);
private final MultiValuedMap<ScrapeJob, ScraperTask> tasks = new ArrayListValuedHashMap<>();
+ private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> futures = new ArrayListValuedHashMap<>();
private final PlcDriverManager driverManager;
private final List<ScrapeJob> jobs;
@@ -65,9 +66,14 @@ public class Scraper {
Validate.notEmpty(jobs);
this.driverManager = driverManager;
this.jobs = jobs;
+ }
+ /**
+ * Start the scraping.
+ */
+ public void start() {
// Schedule all jobs
- LOGGER.info("Registering jobs...");
+ LOGGER.info("Starting jobs...");
jobs.stream()
.flatMap(job -> job.connections.entrySet().stream()
.map(entry -> Triple.of(job, entry.getKey(), entry.getValue()))
@@ -83,8 +89,11 @@ public class Scraper {
handlerPool);
// Add task to internal list
tasks.put(tuple.getLeft(), task);
- scheduler.scheduleAtFixedRate(task,
+ ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task,
0, tuple.getLeft().scrapeRate, TimeUnit.MILLISECONDS);
+
+ // Store the handle for stopping, etc.
+ futures.put(task, future);
}
);
@@ -93,12 +102,34 @@ public class Scraper {
for (Map.Entry<ScrapeJob, ScraperTask> entry : tasks.entries()) {
DescriptiveStatistics statistics = entry.getValue().getLatencyStatistics();
String msg = String.format(Locale.ENGLISH, "Job statistics (%s, %s) number of requests: %d (%d success, %.1f %% failed, %.1f %% too slow), mean latency: %.2f ms, median: %.2f ms",
- entry.getValue().getJobName(), entry.getValue().getConnectionAlias(), entry.getValue().getRequestCounter(), entry.getValue().getSuccessfullRequestCounter(), entry.getValue().getPercentageFailed(), statistics.apply(new PercentageAboveThreshold(entry.getKey().scrapeRate*1e6)), statistics.getMean()*1e-6, statistics.getPercentile(50)*1e-6);
+ entry.getValue().getJobName(), entry.getValue().getConnectionAlias(), entry.getValue().getRequestCounter(), entry.getValue().getSuccessfullRequestCounter(), entry.getValue().getPercentageFailed(), statistics.apply(new PercentageAboveThreshold(entry.getKey().scrapeRate * 1e6)), statistics.getMean() * 1e-6, statistics.getPercentile(50) * 1e-6);
LOGGER.info(msg);
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
}
+ /**
+ * For testing.
+ */
+ ScheduledExecutorService getScheduler() {
+ return scheduler;
+ }
+
+ public int getNumberOfActiveTasks() {
+ return (int) futures.entries().stream().filter(entry -> !entry.getValue().isDone()).count();
+ }
+
+ public void stop() {
+ // Stop all futures
+ LOGGER.info("Stopping scraper...");
+ for (Map.Entry<ScraperTask, ScheduledFuture<?>> entry : futures.entries()) {
+ LOGGER.debug("Stopping task {}...", entry.getKey());
+ entry.getValue().cancel(true);
+ }
+ // Clear the map
+ futures.clear();
+ }
+
public static class ScrapeJob {
private final String name;
@@ -133,7 +164,7 @@ public class Scraper {
long below = Arrays.stream(values)
.filter(val -> val <= threshold)
.count();
- return (double)below/values.length;
+ return (double) below / values.length;
}
@Override
@@ -142,7 +173,7 @@ public class Scraper {
.mapToDouble(i -> values[i])
.filter(val -> val > threshold)
.count();
- return 100.0*below/length;
+ return 100.0 * below / length;
}
@Override
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
index 546f3cb..77df940 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
@@ -90,9 +90,10 @@ public class ScraperTaskTest implements WithAssertions {
ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
1_000, ForkJoinPool.commonPool());
- assertThatThrownBy(scraperTask::run)
- .isInstanceOf(PlcRuntimeException.class)
- .hasMessageContaining("Unable to fetch connection");
+ ScraperTask spy = spy(scraperTask);
+ spy.run();
+
+ verify(spy).handleException(any());
}
@Test
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
index a3107ea..168825b 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
@@ -20,7 +20,6 @@
package org.apache.plc4x.java.scraper;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.apache.plc4x.java.PlcDriverManager;
@@ -30,9 +29,9 @@ import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.base.messages.items.DefaultIntegerFieldItem;
import org.apache.plc4x.java.mock.MockDevice;
import org.apache.plc4x.java.mock.PlcMockConnection;
-import org.apache.plc4x.java.utils.connectionpool.PoolKey;
-import org.apache.plc4x.java.utils.connectionpool.PooledPlcConnectionFactory;
import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -40,12 +39,13 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Arrays;
import java.util.Collections;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-class ScraperTest {
+class ScraperTest implements WithAssertions {
@Mock
MockDevice mockDevice;
@@ -57,6 +57,7 @@ class ScraperTest {
public static final String FIELD_STRING_CH = "%DB3:DBD32:DINT";
@Test
+ @Disabled
void real_stuff() throws InterruptedException {
PlcDriverManager driverManager = new PooledPlcDriverManager(pooledPlcConnectionFactory -> {
GenericKeyedObjectPoolConfig<PlcConnection> config = new GenericKeyedObjectPoolConfig<>();
@@ -102,6 +103,74 @@ class ScraperTest {
)
));
- Thread.sleep(5_000);
+ scraper.start();
+
+ Thread.sleep(1_000);
+
+ // Assert that tasks got done.
+ assertThat(scraper.getScheduler()).isInstanceOf(ScheduledThreadPoolExecutor.class);
+ assertThat(scraper.getNumberOfActiveTasks())
+ .isEqualTo(2);
+ assertThat(((ScheduledThreadPoolExecutor) scraper.getScheduler()).getCompletedTaskCount())
+ .isGreaterThan(10);
+ }
+
+ @Test
+ void stop_stopsAllJobs() throws PlcConnectionException {
+ PlcDriverManager driverManager = new PlcDriverManager();
+ PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:m1");
+ connection.setDevice(mockDevice);
+
+ when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultIntegerFieldItem(1)));
+
+ Scraper scraper = new Scraper(driverManager, Collections.singletonList(
+ new Scraper.ScrapeJob("job1",
+ 1,
+ Collections.singletonMap("m1", "mock:m1"),
+ Collections.singletonMap("field1", "qry1")
+ )
+ ));
+
+ scraper.start();
+
+ assertThat(scraper.getNumberOfActiveTasks())
+ .isEqualTo(1);
+
+ scraper.stop();
+
+ assertThat(scraper.getNumberOfActiveTasks())
+ .isZero();
+ }
+
+ @Test
+ void restart_works() throws PlcConnectionException {
+ PlcDriverManager driverManager = new PlcDriverManager();
+ PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:m1");
+ connection.setDevice(mockDevice);
+
+ when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultIntegerFieldItem(1)));
+
+ Scraper scraper = new Scraper(driverManager, Collections.singletonList(
+ new Scraper.ScrapeJob("job1",
+ 1,
+ Collections.singletonMap("m1", "mock:m1"),
+ Collections.singletonMap("field1", "qry1")
+ )
+ ));
+
+ scraper.start();
+
+ assertThat(scraper.getNumberOfActiveTasks())
+ .isEqualTo(1);
+
+ scraper.stop();
+
+ assertThat(scraper.getNumberOfActiveTasks())
+ .isZero();
+
+ scraper.start();
+
+ assertThat(scraper.getNumberOfActiveTasks())
+ .isEqualTo(1);
}
}
\ No newline at end of file