You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2018/11/24 20:35:56 UTC
[incubator-plc4x] 06/09: [plc4j-scraper] Current state.
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch feature/plc4j-scraper
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 07886c2f6aa314e71235a948a3d3b03666067114
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sat Nov 24 17:40:58 2018 +0100
[plc4j-scraper] Current state.
---
.../plc4x/java/s7/connection/S7PlcConnection.java | 2 +-
plc4j/utils/scraper/pom.xml | 15 +-
.../org/apache/plc4x/java/scraper/Scraper.java | 161 +++++++++++++------
.../org/apache/plc4x/java/scraper/ScraperTask.java | 176 +++++++++++++++++++++
.../apache/plc4x/java/scraper/ScraperTaskTest.java | 111 +++++++++++++
.../org/apache/plc4x/java/scraper/ScraperTest.java | 78 +++++----
.../resources/{logback.xml => logback-test.xml} | 2 +-
7 files changed, 466 insertions(+), 79 deletions(-)
diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
index a56228e..63609d5 100644
--- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
+++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/connection/S7PlcConnection.java
@@ -242,7 +242,7 @@ public class S7PlcConnection extends NettyPlcConnection implements PlcReader, Pl
// If the remote didn't close the connection within the given time-frame, we have to take
// care of closing the connection.
catch (TimeoutException e) {
- logger.info("Remote didn't close connection within the configured timeout of {}ms, shutting down actively.", CLOSE_DEVICE_TIMEOUT_MS, e);
+ logger.debug("Remote didn't close connection within the configured timeout of {} ms, shutting down actively.", CLOSE_DEVICE_TIMEOUT_MS, e);
channel.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/plc4j/utils/scraper/pom.xml b/plc4j/utils/scraper/pom.xml
index b88e8a2..9956414 100644
--- a/plc4j/utils/scraper/pom.xml
+++ b/plc4j/utils/scraper/pom.xml
@@ -49,7 +49,7 @@
</dependency>
<dependency>
<groupId>org.apache.plc4x</groupId>
- <artifactId>plc4j-connectionString-pool</artifactId>
+ <artifactId>plc4j-connection-pool</artifactId>
<version>0.3.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
@@ -59,6 +59,19 @@
<version>0.3.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <!--TODO Remove this-->
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ </dependency>
</dependencies>
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
index dfd4801..90da56a 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
@@ -19,72 +19,135 @@
package org.apache.plc4x.java.scraper;
+import org.apache.commons.collections4.MultiValuedMap;
+import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
+import org.apache.commons.lang3.Validate;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.commons.math3.exception.MathIllegalArgumentException;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math3.stat.descriptive.UnivariateStatistic;
import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
- * Plc Scraper that scrapes one source.
+ * Main class that orchestrates scraping.
*/
-public class Scraper implements Runnable {
+public class Scraper {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Scraper.class);
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10,
+ new BasicThreadFactory.Builder()
+ .namingPattern("scheduler-thread-%d")
+ .daemon(true)
+ .build()
+ );
+ private final ExecutorService handlerPool = Executors.newFixedThreadPool(4,
+ new BasicThreadFactory.Builder()
+ .namingPattern("handler-thread-%d")
+ .daemon(true)
+ .build()
+ );
+ private final MultiValuedMap<ScrapeJob, ScraperTask> tasks = new ArrayListValuedHashMap<>();
private final PlcDriverManager driverManager;
- private final String connectionString;
- private final long requestTimeoutMs;
- private final ResultHandler handler;
+ private final List<ScrapeJob> jobs;
- public Scraper(PlcDriverManager driverManager, String connectionString, long requestTimeoutMs, ResultHandler handler) {
+ public Scraper(PlcDriverManager driverManager, List<ScrapeJob> jobs) {
+ Validate.notEmpty(jobs);
this.driverManager = driverManager;
- this.connectionString = connectionString;
- this.requestTimeoutMs = requestTimeoutMs;
- this.handler = handler;
- }
+ this.jobs = jobs;
+
+ // Schedule all jobs
+ LOGGER.info("Registering jobs...");
+ jobs.stream()
+ .flatMap(job -> job.connections.entrySet().stream()
+ .map(entry -> Triple.of(job, entry.getKey(), entry.getValue()))
+ )
+ .forEach(
+ tuple -> {
+ LOGGER.debug("Register task for job {} for conn {} ({}) at rate {} ms",
+ tuple.getLeft().name, tuple.getMiddle(), tuple.getRight(), tuple.getLeft().scrapeRate);
+ ScraperTask task = new ScraperTask(driverManager,
+ tuple.getLeft().name, tuple.getMiddle(), tuple.getRight(),
+ tuple.getLeft().fields,
+ 1_000,
+ handlerPool);
+ // Add task to internal list
+ tasks.put(tuple.getLeft(), task);
+ scheduler.scheduleAtFixedRate(task,
+ 0, tuple.getLeft().scrapeRate, TimeUnit.MILLISECONDS);
+ }
+ );
- @Override
- public void run() {
- // Does a single fetch
- try (PlcConnection connection = driverManager.getConnection(connectionString)) {
- PlcReadResponse response;
- try {
- response = connection.readRequestBuilder()
- .addItem("item1", "add1")
- .build()
- .execute()
- .get(requestTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- // Handle execution exception
- handler.handleException(e);
- return;
+ // Add statistics tracker
+ scheduler.scheduleAtFixedRate(() -> {
+ for (Map.Entry<ScrapeJob, ScraperTask> entry : tasks.entries()) {
+ DescriptiveStatistics statistics = entry.getValue().getLatencyStatistics();
+ String msg = String.format(Locale.ENGLISH, "Job statistics (%s, %s) number of requests: %d (%d success, %.1f %% failed, %.1f %% too slow), mean latency: %.2f ms, median: %.2f ms",
+ entry.getValue().getJobName(), entry.getValue().getConnectionAlias(), entry.getValue().getRequestCounter(), entry.getValue().getSuccessfullRequestCounter(), entry.getValue().getPercentageFailed(), statistics.apply(new PercentageAboveThreshold(entry.getKey().scrapeRate*1e6)), statistics.getMean()*1e-6, statistics.getPercentile(50)*1e-6);
+ LOGGER.info(msg);
}
- CompletableFuture.runAsync(() -> handler.handle(transformResponseToMap(response)));
- } catch (PlcConnectionException e) {
- throw new PlcRuntimeException("Unable to fetch", e);
- } catch (Exception e) {
- throw new PlcRuntimeException("Unexpected exception during fetch", e);
- }
+ }, 1_000, 1_000, TimeUnit.MILLISECONDS);
}
- private Map<String, Object> transformResponseToMap(PlcReadResponse response) {
- return response.getFieldNames().stream()
- .collect(Collectors.toMap(
- name -> name,
- response::getObject
- ));
+ public static class ScrapeJob {
+
+ private final String name;
+ private final long scrapeRate;
+ /**
+ * alias -> connection-string
+ */
+ private final Map<String, String> connections;
+ /**
+ * alias -> field-query
+ */
+ private final Map<String, String> fields;
+
+ public ScrapeJob(String name, long scrapeRate, Map<String, String> connections, Map<String, String> fields) {
+ this.name = name;
+ this.scrapeRate = scrapeRate;
+ this.connections = connections;
+ this.fields = fields;
+ }
}
- public interface ResultHandler {
+ private static class PercentageAboveThreshold implements UnivariateStatistic {
+
+ private final double threshold;
- void handle(Map<String, Object> result);
+ public PercentageAboveThreshold(double threshold) {
+ this.threshold = threshold;
+ }
- void handleException(Exception e);
+ @Override
+ public double evaluate(double[] values) throws MathIllegalArgumentException {
+ long below = Arrays.stream(values)
+ .filter(val -> val <= threshold)
+ .count();
+ return (double)below/values.length;
+ }
+ @Override
+ public double evaluate(double[] values, int begin, int length) throws MathIllegalArgumentException {
+ long below = IntStream.range(begin, length)
+ .mapToDouble(i -> values[i])
+ .filter(val -> val > threshold)
+ .count();
+ return 100.0*below/length;
+ }
+
+ @Override
+ public UnivariateStatistic copy() {
+ return new PercentageAboveThreshold(threshold);
+ }
}
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java
new file mode 100644
index 0000000..9bbf5e4
--- /dev/null
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTask.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Plc Scraper that scrapes one source.
+ */
+public class ScraperTask implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ScraperTask.class);
+
+ private final PlcDriverManager driverManager;
+ private final String jobName;
+ private final String connectionAlias;
+ private final String connectionString;
+ private final Map<String, String> fields;
+ private final long requestTimeoutMs;
+ private final ExecutorService handlerService;
+
+ private final AtomicLong requestCounter = new AtomicLong(0);
+ private final AtomicLong successCounter = new AtomicLong(0);
+ private final DescriptiveStatistics latencyStatistics = new DescriptiveStatistics(1000);
+ private final DescriptiveStatistics failedStatistics = new DescriptiveStatistics(1000);
+
+ public ScraperTask(PlcDriverManager driverManager, String jobName, String connectionAlias, String connectionString,
+ Map<String, String> fields, long requestTimeoutMs, ExecutorService handlerService) {
+ Validate.notNull(driverManager);
+ Validate.notBlank(jobName);
+ Validate.notBlank(connectionAlias);
+ Validate.notBlank(connectionString);
+ Validate.notEmpty(fields);
+ Validate.isTrue(requestTimeoutMs > 0);
+ this.driverManager = driverManager;
+ this.jobName = jobName;
+ this.connectionAlias = connectionAlias;
+ this.connectionString = connectionString;
+ this.fields = fields;
+ this.requestTimeoutMs = requestTimeoutMs;
+ this.handlerService = handlerService;
+ }
+
+ @Override
+ public void run() {
+ // Does a single fetch
+ LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+ requestCounter.incrementAndGet();
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ try (PlcConnection connection = driverManager.getConnection(connectionString)) {
+ LOGGER.trace("Connection to {} established: {}", connectionString, connection);
+ PlcReadResponse response;
+ try {
+ PlcReadRequest.Builder builder = connection.readRequestBuilder();
+ fields.forEach((alias,qry) -> {
+ LOGGER.trace("Requesting: {} -> {}", alias, qry);
+ builder.addItem(alias,qry);
+ });
+ response = builder
+ .build()
+ .execute()
+ .get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ // Handle execution exception
+ handleException(e);
+ return;
+ }
+ // Add statistics
+ stopWatch.stop();
+ latencyStatistics.addValue(stopWatch.getNanoTime());
+ failedStatistics.addValue(0.0);
+ successCounter.incrementAndGet();
+ // Validate response
+ validateResponse(response);
+ // Handle response (Async)
+ CompletableFuture.runAsync(() -> handle(transformResponseToMap(response)), handlerService);
+ } catch (Exception e) {
+ failedStatistics.addValue(1.0);
+ LOGGER.debug("Exception during scrape", e);
+ handleException(e);
+ }
+ }
+
+ private void validateResponse(PlcReadResponse response) {
+ Map<String, PlcResponseCode> failedFields = response.getFieldNames().stream()
+ .filter(name -> !PlcResponseCode.OK.equals(response.getResponseCode(name)))
+ .collect(Collectors.toMap(
+ Function.identity(),
+ response::getResponseCode
+ ));
+ if (failedFields.size() > 0) {
+ handleErrorResponse(failedFields);
+ }
+ }
+
+ private Map<String, Object> transformResponseToMap(PlcReadResponse response) {
+ return response.getFieldNames().stream()
+ .collect(Collectors.toMap(
+ name -> name,
+ response::getObject
+ ));
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public String getConnectionAlias() {
+ return connectionAlias;
+ }
+
+ public long getRequestCounter() {
+ return requestCounter.get();
+ }
+
+ public long getSuccessfullRequestCounter() {
+ return successCounter.get();
+ }
+
+ public DescriptiveStatistics getLatencyStatistics() {
+ return latencyStatistics;
+ }
+
+ public double getPercentageFailed() {
+ return 100.0*failedStatistics.getMean();
+ }
+
+ public void handle(Map<String, Object> result) {
+ LOGGER.debug("Handling result on gorgeous pool: {}", result);
+ }
+
+ public void handleException(Exception e) {
+ failedStatistics.addValue(1.0);
+ }
+
+ public void handleErrorResponse(Map<String, PlcResponseCode> failed) {
+ LOGGER.warn("Handling error responses: {}", failed);
+ }
+
+}
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
new file mode 100644
index 0000000..546f3cb
--- /dev/null
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTaskTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.items.DefaultStringFieldItem;
+import org.apache.plc4x.java.mock.MockDevice;
+import org.apache.plc4x.java.mock.PlcMockConnection;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+public class ScraperTaskTest implements WithAssertions {
+
+ @Mock
+ MockDevice mockDevice;
+
+ @Test
+ public void scrape() throws PlcConnectionException {
+ PlcDriverManager driverManager = new PlcDriverManager();
+ PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
+ connection.setDevice(mockDevice);
+ when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultStringFieldItem("hallo")));
+
+ ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
+ 1_000, ForkJoinPool.commonPool());
+
+ scraperTask.run();
+ }
+
+ @Nested
+ class Exceptions {
+
+ @Test
+ public void badResponseCode_shouldHandleException() throws PlcConnectionException {
+ // Given
+ PlcDriverManager driverManager = new PlcDriverManager();
+ PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
+ connection.setDevice(mockDevice);
+ when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.NOT_FOUND, new DefaultStringFieldItem("hallo")));
+
+ ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1",
+ "mock:scraper", Collections.singletonMap("a", "b"), 1_000, ForkJoinPool.commonPool());
+
+ // When
+ scraperTask.run();
+ }
+
+ @Mock
+ PlcDriverManager driverManager;
+
+ @Test
+ public void handleConnectionException() throws PlcConnectionException {
+ // Given
+ when(driverManager.getConnection(anyString())).thenThrow(new PlcConnectionException("stfu"));
+
+ ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
+ 1_000, ForkJoinPool.commonPool());
+
+ assertThatThrownBy(scraperTask::run)
+ .isInstanceOf(PlcRuntimeException.class)
+ .hasMessageContaining("Unable to fetch connection");
+ }
+
+ @Test
+ void runByScheduler_handledGracefully() throws PlcConnectionException {
+ when(driverManager.getConnection(anyString())).thenThrow(new PlcConnectionException("stfu"));
+ ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
+ ScraperTask scraperTask = new ScraperTask(driverManager, "job1", "m1", "mock:scraper", Collections.singletonMap("a", "b"),
+ 1_000, ForkJoinPool.commonPool());
+
+ Future<?> future = pool.scheduleAtFixedRate(scraperTask, 0, 10, TimeUnit.MILLISECONDS);
+
+ assertThat(future).isNotDone();
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
index 25b526a..3a25f34 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperTest.java
@@ -20,57 +20,81 @@
package org.apache.plc4x.java.scraper;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.java.base.messages.items.DefaultStringFieldItem;
+import org.apache.plc4x.java.base.messages.items.DefaultIntegerFieldItem;
import org.apache.plc4x.java.mock.MockDevice;
import org.apache.plc4x.java.mock.PlcMockConnection;
-import org.junit.Test;
-import org.mockito.Mockito;
+import org.apache.plc4x.java.utils.connectionpool.PoolKey;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcConnectionFactory;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
-public class ScraperTest {
+@ExtendWith(MockitoExtension.class)
+class ScraperTest {
- @Test
- public void scrape() throws PlcConnectionException {
- PlcDriverManager driverManager = new PlcDriverManager();
- PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
- MockDevice mockDevice = Mockito.mock(MockDevice.class);
- connection.setDevice(mockDevice);
- when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultStringFieldItem("hallo")));
+ @Mock
+ MockDevice mockDevice;
- Scraper scraper = new Scraper(driverManager, "mock:scraper", 1_000, new Scraper.ResultHandler() {
- @Override
- public void handle(Map<String, Object> result) {
- System.out.println(result);
- }
+ public static final String CONN_STRING_TIM = "s7://10.10.64.22/0/1";
+ public static final String FIELD_STRING_TIM = "%DB225:DBW0:INT";
+
+ public static final String CONN_STRING_CH = "s7://10.10.64.20/0/1";
+ public static final String FIELD_STRING_CH = "%DB3:DBD32:DINT";
+ @Test
+ void real_stuff() throws InterruptedException {
+ PlcDriverManager driverManager = new PooledPlcDriverManager(new PooledPlcDriverManager.PoolCreator() {
@Override
- public void handleException(Exception e) {
- System.err.println(e);
+ public KeyedObjectPool<PoolKey, PlcConnection> createPool(PooledPlcConnectionFactory pooledPlcConnectionFactory) {
+ return null;
}
});
- scraper.run();
+ Scraper scraper = new Scraper(driverManager, Arrays.asList(
+ new Scraper.ScrapeJob("job1",
+ 10,
+ Collections.singletonMap("tim", CONN_STRING_TIM),
+ Collections.singletonMap("distance", FIELD_STRING_TIM)
+ ),
+ new Scraper.ScrapeJob("job2",
+ 10,
+ Collections.singletonMap("chris", CONN_STRING_CH),
+ Collections.singletonMap("counter", FIELD_STRING_CH)
+ )
+ ));
+
+ Thread.sleep(300_000);
}
@Test
- public void scrape_badResponseCode_shouldHandleException() throws PlcConnectionException {
+ void scraper_schedulesJob() throws InterruptedException, PlcConnectionException {
PlcDriverManager driverManager = new PlcDriverManager();
- PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:scraper");
- MockDevice mockDevice = Mockito.mock(MockDevice.class);
+ PlcMockConnection connection = (PlcMockConnection) driverManager.getConnection("mock:m1");
connection.setDevice(mockDevice);
- when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.NOT_FOUND, new DefaultStringFieldItem("hallo")));
- Scraper.ResultHandler handler = Mockito.mock(Scraper.ResultHandler.class);
+ when(mockDevice.read(any())).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultIntegerFieldItem(1)));
- Scraper scraper = new Scraper(driverManager, "mock:scraper", 1_000, null);
+ Scraper scraper = new Scraper(driverManager, Collections.singletonList(
+ new Scraper.ScrapeJob("job1",
+ 10,
+ Collections.singletonMap("m1", "mock:m1"),
+ Collections.singletonMap("field1", "qry1")
+ )
+ ));
- scraper.run();
+ Thread.sleep(5_000);
}
}
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/resources/logback.xml b/plc4j/utils/scraper/src/test/resources/logback-test.xml
similarity index 97%
rename from plc4j/utils/scraper/src/test/resources/logback.xml
rename to plc4j/utils/scraper/src/test/resources/logback-test.xml
index f0f2b2e..c562020 100644
--- a/plc4j/utils/scraper/src/test/resources/logback.xml
+++ b/plc4j/utils/scraper/src/test/resources/logback-test.xml
@@ -29,7 +29,7 @@
</encoder>
</appender>
- <root level="ERROR">
+ <root level="INFO">
<appender-ref ref="STDOUT"/>
</root>