You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by tm...@apache.org on 2019/05/08 15:08:32 UTC
[plc4x] 01/02: Merge branch 'scraper_refactoring_and_improvement'
into feature/jmx-for-scraper-integration
This is an automated email from the ASF dual-hosted git repository.
tmitsch pushed a commit to branch feature/jmx-for-scraper-integration
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 777a5959ec7d568433c0eebf0276f745a46269f0
Merge: 98d8386 6ba3f5c
Author: Tim Mitsch <t....@pragmaticindustries.de>
AuthorDate: Wed May 8 16:31:59 2019 +0200
Merge branch 'scraper_refactoring_and_improvement' into feature/jmx-for-scraper-integration
# Conflicts:
# plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
# plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
# plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
# plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandler.java
.../org/apache/plc4x/java/s7/model/S7Field.java | 34 +-
.../plc4x/java/s7/netty/Plc4XS7Protocol.java | 99 +++
.../org/apache/plc4x/java/s7/netty/S7Protocol.java | 45 +-
.../java/s7/netty/model/types/TransportSize.java | 7 +-
.../strategies/DefaultS7MessageProcessor.java | 1 +
plc4j/utils/scraper/README.md | 21 +
.../triggeredscraper/TriggeredScraperImpl.java | 208 ++++--
.../triggeredscraper/TriggeredScraperTask.java | 63 +-
.../triggerhandler/TriggerConfiguration.java | 709 +++++++++++++++------
.../triggerhandler/TriggerHandler.java | 122 +---
.../triggerhandler/TriggerHandlerImpl.java | 113 ++++
.../triggerhandler/collector/TriggerCollector.java | 61 ++
.../collector/TriggerCollectorImpl.java | 335 ++++++++++
.../plc4x/java/scraper/TriggeredScraperRunner.java | 12 +-
.../java/scraper/TriggeredScraperRunnerModbus.java | 12 +-
.../triggerhandler/TriggerConfigurationTest.java | 60 +-
.../test/resources/example_triggered_scraper.yml | 22 +-
17 files changed, 1490 insertions(+), 434 deletions(-)
diff --cc plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
index 46cf44f,4b76214..84a9c9f
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
@@@ -56,11 -60,10 +62,12 @@@ import java.util.stream.Collectors
* right now boolean variables as well as numeric variables could be used as data-types
* available comparators are ==,!= for all data-types and >,>=,<,<= for numeric data-types
*/
-public class TriggeredScraperImpl implements Scraper {
+public class TriggeredScraperImpl implements Scraper, TriggeredScraperImplMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperImpl.class);
+ public static final String MX_DOMAIN = "org.apache.plc4x.java";
+
+ private static final int DEFAULT_FUTURE_TIME_OUT = 2000;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10,
new BasicThreadFactory.Builder()
@@@ -78,36 -81,51 +85,67 @@@
private final ResultHandler resultHandler;
private final MultiValuedMap<ScrapeJob, ScraperTask> tasks = new ArrayListValuedHashMap<>();
- private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> futures = new ArrayListValuedHashMap<>();
+ private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> scraperTaskMap = new ArrayListValuedHashMap<>();
private final PlcDriverManager driverManager;
private final List<ScrapeJob> jobs;
+ private MBeanServer mBeanServer;
+ private long futureTimeOut;
+
+ private final TriggerCollector triggerCollector;
+
/**
* Creates a Scraper instance from a configuration.
* By default a {@link PooledPlcDriverManager} is used.
* @param config Configuration to use.
- * @param resultHandler
+ * @param resultHandler handler the defines the processing of acquired data
*/
- public TriggeredScraperImpl(TriggeredScraperConfiguration config, ResultHandler resultHandler) throws ScraperException {
- this(resultHandler, createPooledDriverManager(), config.getJobs());
+ public TriggeredScraperImpl(TriggeredScraperConfiguration config, ResultHandler resultHandler,TriggerCollector triggerCollector) throws ScraperException {
+ this(resultHandler, createPooledDriverManager(), config.getJobs(),triggerCollector,DEFAULT_FUTURE_TIME_OUT);
+ }
+
+ /**
+ * Creates a Scraper instance from a configuration.
+ * @param config Configuration to use.
+ * @param plcDriverManager external DriverManager
+ * @param resultHandler handler the defines the processing of acquired data
+ */
+ public TriggeredScraperImpl(TriggeredScraperConfiguration config, PlcDriverManager plcDriverManager, ResultHandler resultHandler,TriggerCollector triggerCollector) throws ScraperException {
+ this(resultHandler, plcDriverManager, config.getJobs(),triggerCollector,DEFAULT_FUTURE_TIME_OUT);
+ }
+
+ /**
+ * Creates a Scraper instance from a configuration.
+ * @param plcDriverManager external DriverManager
+ * @param resultHandler handler the defines the processing of acquired data
+ * @param jobs list of jobs that scraper shall handle
+ * @param triggerCollector a collection that centralizes the trigger requests and joins them to grouped plc requests
+ * @param futureTimeOut max duration of future to return a result
+ */
+ public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager plcDriverManager, List<ScrapeJob> jobs,TriggerCollector triggerCollector, long futureTimeOut) {
+ this.resultHandler = resultHandler;
+ Validate.notEmpty(jobs);
+ this.driverManager = plcDriverManager;
+ this.jobs = jobs;
+ this.triggerCollector = triggerCollector;
+ this.futureTimeOut = futureTimeOut;
}
+
+ public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager driverManager, List<ScrapeJob> jobs) {
+ this.resultHandler = resultHandler;
+ Validate.notEmpty(jobs);
+ this.driverManager = driverManager;
+ this.jobs = jobs;
+ // Register MBean
+ mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ mBeanServer.registerMBean(this, new ObjectName(MX_DOMAIN, "scraper", "scraper"));
+ } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+ LOGGER.debug("Unable to register Scraper as MBean", e);
+ }
+ }
+
/**
* Min Idle per Key is set to 1 for situations where the network is broken.
* Then, on reconnect we can fail all getConnection calls (in the ScraperTask) fast until
@@@ -123,6 -141,6 +161,14 @@@
});
}
++
++ public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager driverManager, List<ScrapeJob> jobs) {
++ this.resultHandler = resultHandler;
++ Validate.notEmpty(jobs);
++ this.driverManager = driverManager;
++ this.jobs = jobs;
++ }
++
/**
* Start the scraping.
*/
@@@ -131,41 -149,44 +177,45 @@@
public void start() {
// Schedule all jobs
LOGGER.info("Starting jobs...");
- jobs.stream()
- .flatMap(job -> job.getSourceConnections().entrySet().stream()
- .map(entry -> Triple.of(job, entry.getKey(), entry.getValue()))
- )
- .forEach(
- tuple -> {
- LOGGER.debug("Register task for job {} for conn {} ({}) at rate {} ms",
- tuple.getLeft().getJobName(), tuple.getMiddle(), tuple.getRight(), tuple.getLeft().getScrapeRate());
- TriggeredScraperTask task =
- null;
- try {
- task = new TriggeredScraperTask(driverManager,
- tuple.getLeft().getJobName(),
- tuple.getMiddle(),
- tuple.getRight(),
- tuple.getLeft().getFields(),
- 1_000,
- executorService,
- resultHandler,
- (TriggeredScrapeJobImpl) tuple.getLeft());
- // Register task mxbean
- registerTaskMBean(task);
- // Add task to internal list
- tasks.put(tuple.getLeft(), task);
- ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task,
- 0, tuple.getLeft().getScrapeRate(), TimeUnit.MILLISECONDS);
-
- // Store the handle for stopping, etc.
- futures.put(task, future);
- } catch (ScraperException e) {
- LOGGER.warn("Error executing the job {} for conn {} ({}) at rate {} ms",tuple.getLeft().getJobName(), tuple.getMiddle(), tuple.getRight(), tuple.getLeft().getScrapeRate(),e);
- }
+ //start iterating over all available jobs
+ for(ScrapeJob job:jobs){
+ //iterate over all source the jobs shall performed on
+ for(Map.Entry<String,String> sourceEntry:job.getSourceConnections().entrySet()){
+ LOGGER.debug("Register task for job {} for conn {} ({}) at rate {} ms",
+ job.getJobName(),
+ sourceEntry.getKey(),
+ sourceEntry.getValue(),
+ job.getScrapeRate());
+
+ //create the regarding triggered scraper task
+ TriggeredScraperTask triggeredScraperTask;
+ try {
+ triggeredScraperTask = new TriggeredScraperTask(
+ driverManager,
+ job.getJobName(),
+ sourceEntry.getKey(),
+ sourceEntry.getValue(),
+ job.getFields(),
+ futureTimeOut,
+ executorService,
+ resultHandler,
+ (TriggeredScrapeJobImpl) job,
+ triggerCollector);
+ // Add task to internal list
+ LOGGER.info("Task {} added to scheduling", triggeredScraperTask);
++ registerTaskMBean(task);
+ tasks.put(job, triggeredScraperTask);
+ ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(triggeredScraperTask, 0, job.getScrapeRate(), TimeUnit.MILLISECONDS);
+
+ // Store the handle for stopping, etc.
+ scraperTaskMap.put(triggeredScraperTask, future);
+ } catch (ScraperException e) {
+ LOGGER.warn("Error executing the job {} for conn {} ({}) at rate {} ms",job.getJobName(), sourceEntry.getKey(), sourceEntry.getValue(), job.getScrapeRate(),e);
}
- );
+ }
+
+ }
// Add statistics tracker
scheduler.scheduleAtFixedRate(() -> {
@@@ -182,19 -203,11 +232,18 @@@
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
}
- @Override
- public int getNumberOfActiveTasks() {
- return 0;
+ /**
+ * Register a task as MBean
+ * @param task task to register
+ */
+ private void registerTaskMBean(ScraperTask task) {
+ try {
+ mBeanServer.registerMBean(task, new ObjectName(MX_DOMAIN + ":type=ScrapeTask,name=" + task.getJobName() + "-" + task.getConnectionAlias()));
+ } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+ LOGGER.debug("Unable to register Task as MBean", e);
+ }
}
-
@Override
public void stop() {
// Stop all futures
@@@ -204,19 -217,83 +253,96 @@@
entry.getValue().cancel(true);
}
// Clear the map
- futures.clear();
+ scraperTaskMap.clear();
+ }
+
+ /**
+ * acquires a plc connection from connection pool
+ * @param plcDriverManager Driver manager handling connection and pools
+ * @param connectionString Connection string as defined in the regarding implementation of {@link PlcDriver}
+ * @param executorService ExecuterService holding a pool as threads handling requests and stuff
+ * @param requestTimeoutMs maximum awaiting for the the future to return a result
+ * @param info additional info for trace reasons
+ * @return the {@link PlcConnection} used for acquiring data from PLC endpoint
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ public static PlcConnection getPlcConnection(PlcDriverManager plcDriverManager,
+ String connectionString,
+ ExecutorService executorService,
+ long requestTimeoutMs,
+ String info) throws InterruptedException, ExecutionException, TimeoutException {
+ if(!info.isEmpty() && LOGGER.isTraceEnabled()){
+ LOGGER.trace("Additional Info from caller {}", info);
+ }
+ CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
+ try {
+ return plcDriverManager.getConnection(connectionString);
+ } catch (PlcConnectionException e) {
+ LOGGER.warn("Unable to instantiate connection to " + connectionString, e);
+ throw new PlcRuntimeException(e);
+ }
+ catch (Exception e){
+ LOGGER.warn("Unable to instantiate connection to " + connectionString, e);
+ throw new PlcRuntimeException(e);
+ }
+ }, executorService);
+ if(LOGGER.isTraceEnabled()){
+ LOGGER.trace("try to get a connection to {}", connectionString);
+ }
+ PlcConnection plcConnection=null;
+ try {
+ plcConnection = future.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e){
+ LOGGER.trace("Additional Info from caller {}", info,e);
+ throw e;
+ }
+ return plcConnection;
+ }
+
+ /**
+ * acquires a plc connection from connection pool
+ * @param plcDriverManager Driver manager handling connection and pools
+ * @param connectionString Connection string as defined in the regarding implementation of {@link PlcDriver}
+ * @param executorService ExecuterService holding a pool as threads handling requests and stuff
+ * @param requestTimeoutMs maximum awaiting for the the future to return a result
+ * @return the {@link PlcConnection} used for acquiring data from PLC endpoint
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ public static PlcConnection getPlcConnection(PlcDriverManager plcDriverManager,
+ String connectionString,
+ ExecutorService executorService,
+ long requestTimeoutMs) throws InterruptedException, ExecutionException, TimeoutException {
+ return getPlcConnection(plcDriverManager,connectionString,executorService,requestTimeoutMs,"");
+ }
+
+ /**
+ * transforms the results from a {@link PlcReadResponse} into a map
+ * @param plcReadResponse response that shall be converted to map for further processing
+ * @return the converted map
+ */
+ public static Map<String, Object> convertPlcResponseToMap(PlcReadResponse plcReadResponse) {
+ return plcReadResponse.getFieldNames().stream()
+ .collect(Collectors.toMap(
+ name -> name,
+ plcReadResponse::getObject
+ ));
}
+
+
+ // MBean methods
+ @Override
+ public boolean isRunning() {
+ // TODO is this okay so?
+ return !futures.isEmpty();
+ }
+
+ @Override
+ public int getNumberOfActiveTasks() {
+ return (int) futures.entries().stream().filter(entry -> !entry.getValue().isDone()).count();
+ }
}
diff --cc plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
index 5cc7aaf,dc0d676..c0e888c
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
@@@ -89,10 -93,12 +91,13 @@@ public class TriggeredScraperTask imple
@Override
//ToDo code-refactoring and improved testing --> PLC4X-90
public void run() {
+ LOGGER.trace("Check condition for task of job {} for connection {}", jobName, connectionAlias);
if(this.triggerHandler.checkTrigger()) {
// Does a single fetch only when trigger is valid
- LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+ LOGGER.info("Trigger for job {} and device {} is met ... scraping desired data",jobName,connectionAlias);
+ if(LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+ }
requestCounter.incrementAndGet();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
@@@ -219,30 -228,17 +222,44 @@@
return requestTimeoutMs;
}
+ @Override
+ public String toString() {
+ return "TriggeredScraperTask{" +
+ "driverManager=" + driverManager +
+ ", jobName='" + jobName + '\'' +
+ ", connectionAlias='" + connectionAlias + '\'' +
+ ", connectionString='" + connectionString + '\'' +
+ ", requestTimeoutMs=" + requestTimeoutMs +
+ ", executorService=" + executorService +
+ ", resultHandler=" + resultHandler +
+ ", triggerHandler=" + triggerHandler +
+ '}';
+ }
++
+ //---------------------------------
+ // JMX Monitoring
+ //---------------------------------
+ @Override
+ public long getScrapesTotal() {
+ return requestCounter.get();
+ }
+
+ @Override
+ public long getScrapesSuccess() {
+ return successCounter.get();
+ }
+
+ @Override
+ public double getPercentageFailed() {
+ return 100.0 - (double)this.getScrapesSuccess()/this.getScrapesTotal() * 100.0;
+ }
+
+ @Override
+ public String[] getPercentiles() {
+ String[] percentiles = new String[10];
+ for (int i = 1; i <= 10; i += 1) {
+ percentiles[i - 1] = String.format("%d%%: %s ms", 10 * i, latencyStatistics.getPercentile(10.0 * i) * 1e-6);
+ }
+ return percentiles;
+ }
}