You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by tm...@apache.org on 2019/05/08 15:08:32 UTC

[plc4x] 01/02: Merge branch 'scraper_refactoring_and_improvement' into feature/jmx-for-scraper-integration

This is an automated email from the ASF dual-hosted git repository.

tmitsch pushed a commit to branch feature/jmx-for-scraper-integration
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 777a5959ec7d568433c0eebf0276f745a46269f0
Merge: 98d8386 6ba3f5c
Author: Tim Mitsch <t....@pragmaticindustries.de>
AuthorDate: Wed May 8 16:31:59 2019 +0200

    Merge branch 'scraper_refactoring_and_improvement' into feature/jmx-for-scraper-integration
    
    # Conflicts:
    #	plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
    #	plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
    #	plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
    #	plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandler.java

 .../org/apache/plc4x/java/s7/model/S7Field.java    |  34 +-
 .../plc4x/java/s7/netty/Plc4XS7Protocol.java       |  99 +++
 .../org/apache/plc4x/java/s7/netty/S7Protocol.java |  45 +-
 .../java/s7/netty/model/types/TransportSize.java   |   7 +-
 .../strategies/DefaultS7MessageProcessor.java      |   1 +
 plc4j/utils/scraper/README.md                      |  21 +
 .../triggeredscraper/TriggeredScraperImpl.java     | 208 ++++--
 .../triggeredscraper/TriggeredScraperTask.java     |  63 +-
 .../triggerhandler/TriggerConfiguration.java       | 709 +++++++++++++++------
 .../triggerhandler/TriggerHandler.java             | 122 +---
 .../triggerhandler/TriggerHandlerImpl.java         | 113 ++++
 .../triggerhandler/collector/TriggerCollector.java |  61 ++
 .../collector/TriggerCollectorImpl.java            | 335 ++++++++++
 .../plc4x/java/scraper/TriggeredScraperRunner.java |  12 +-
 .../java/scraper/TriggeredScraperRunnerModbus.java |  12 +-
 .../triggerhandler/TriggerConfigurationTest.java   |  60 +-
 .../test/resources/example_triggered_scraper.yml   |  22 +-
 17 files changed, 1490 insertions(+), 434 deletions(-)

diff --cc plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
index 46cf44f,4b76214..84a9c9f
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
@@@ -56,11 -60,10 +62,12 @@@ import java.util.stream.Collectors
   *     right now boolean variables as well as numeric variables could be used as data-types
   *     available comparators are ==,!= for all data-types and &gt;,&gt;=,&lt;,&lt;= for numeric data-types
   */
 -public class TriggeredScraperImpl implements Scraper {
 +public class TriggeredScraperImpl implements Scraper, TriggeredScraperImplMBean {
  
      private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperImpl.class);
 +    public static final String MX_DOMAIN = "org.apache.plc4x.java";
 +
+     private static final int DEFAULT_FUTURE_TIME_OUT = 2000;
  
      private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10,
          new BasicThreadFactory.Builder()
@@@ -78,36 -81,51 +85,67 @@@
      private final ResultHandler resultHandler;
  
      private final MultiValuedMap<ScrapeJob, ScraperTask> tasks = new ArrayListValuedHashMap<>();
-     private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> futures = new ArrayListValuedHashMap<>();
+     private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> scraperTaskMap = new ArrayListValuedHashMap<>();
      private final PlcDriverManager driverManager;
      private final List<ScrapeJob> jobs;
 +    private MBeanServer mBeanServer;
  
+     private long futureTimeOut;
+ 
+     private final TriggerCollector triggerCollector;
+ 
      /**
       * Creates a Scraper instance from a configuration.
       * By default a {@link PooledPlcDriverManager} is used.
       * @param config Configuration to use.
-      * @param resultHandler
+      * @param resultHandler handler the defines the processing of acquired data
       */
-     public TriggeredScraperImpl(TriggeredScraperConfiguration config, ResultHandler resultHandler) throws ScraperException {
-         this(resultHandler, createPooledDriverManager(), config.getJobs());
+     public TriggeredScraperImpl(TriggeredScraperConfiguration config, ResultHandler resultHandler,TriggerCollector triggerCollector) throws ScraperException {
+         this(resultHandler, createPooledDriverManager(), config.getJobs(),triggerCollector,DEFAULT_FUTURE_TIME_OUT);
+     }
+ 
+     /**
+      * Creates a Scraper instance from a configuration.
+      * @param config Configuration to use.
+      * @param plcDriverManager external DriverManager
+      * @param resultHandler handler the defines the processing of acquired data
+      */
+     public TriggeredScraperImpl(TriggeredScraperConfiguration config, PlcDriverManager plcDriverManager, ResultHandler resultHandler,TriggerCollector triggerCollector) throws ScraperException {
+         this(resultHandler, plcDriverManager, config.getJobs(),triggerCollector,DEFAULT_FUTURE_TIME_OUT);
+     }
+ 
+     /**
+      * Creates a Scraper instance from a configuration.
+      * @param plcDriverManager external DriverManager
+      * @param resultHandler handler the defines the processing of acquired data
+      * @param jobs list of jobs that scraper shall handle
+      * @param triggerCollector a collection that centralizes the trigger requests and joins them to grouped plc requests
+      * @param futureTimeOut max duration of future to return a result
+      */
+     public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager plcDriverManager, List<ScrapeJob> jobs,TriggerCollector triggerCollector, long futureTimeOut) {
+         this.resultHandler = resultHandler;
+         Validate.notEmpty(jobs);
+         this.driverManager = plcDriverManager;
+         this.jobs = jobs;
+         this.triggerCollector = triggerCollector;
+         this.futureTimeOut = futureTimeOut;
      }
  
 +
 +    public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager driverManager, List<ScrapeJob> jobs) {
 +        this.resultHandler = resultHandler;
 +        Validate.notEmpty(jobs);
 +        this.driverManager = driverManager;
 +        this.jobs = jobs;
 +        // Register MBean
 +        mBeanServer = ManagementFactory.getPlatformMBeanServer();
 +        try {
 +            mBeanServer.registerMBean(this, new ObjectName(MX_DOMAIN, "scraper", "scraper"));
 +        } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
 +            LOGGER.debug("Unable to register Scraper as MBean", e);
 +        }
 +    }
 +
      /**
       * Min Idle per Key is set to 1 for situations where the network is broken.
       * Then, on reconnect we can fail all getConnection calls (in the ScraperTask) fast until
@@@ -123,6 -141,6 +161,14 @@@
          });
      }
  
++
++    public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager driverManager, List<ScrapeJob> jobs) {
++        this.resultHandler = resultHandler;
++        Validate.notEmpty(jobs);
++        this.driverManager = driverManager;
++        this.jobs = jobs;
++    }
++
      /**
       * Start the scraping.
       */
@@@ -131,41 -149,44 +177,45 @@@
      public void start() {
          // Schedule all jobs
          LOGGER.info("Starting jobs...");
-         jobs.stream()
-             .flatMap(job -> job.getSourceConnections().entrySet().stream()
-                 .map(entry -> Triple.of(job, entry.getKey(), entry.getValue()))
-             )
-             .forEach(
-                 tuple -> {
-                     LOGGER.debug("Register task for job {} for conn {} ({}) at rate {} ms",
-                         tuple.getLeft().getJobName(), tuple.getMiddle(), tuple.getRight(), tuple.getLeft().getScrapeRate());
-                     TriggeredScraperTask task =
-                         null;
-                     try {
-                         task = new TriggeredScraperTask(driverManager,
-                             tuple.getLeft().getJobName(),
-                             tuple.getMiddle(),
-                             tuple.getRight(),
-                             tuple.getLeft().getFields(),
-                             1_000,
-                             executorService,
-                             resultHandler,
-                             (TriggeredScrapeJobImpl) tuple.getLeft());
-                         // Register task mxbean
-                         registerTaskMBean(task);
-                         // Add task to internal list
-                         tasks.put(tuple.getLeft(), task);
-                         ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task,
-                             0, tuple.getLeft().getScrapeRate(), TimeUnit.MILLISECONDS);
- 
-                         // Store the handle for stopping, etc.
-                         futures.put(task, future);
-                     } catch (ScraperException e) {
-                         LOGGER.warn("Error executing the job {} for conn {} ({}) at rate {} ms",tuple.getLeft().getJobName(), tuple.getMiddle(), tuple.getRight(), tuple.getLeft().getScrapeRate(),e);
-                     }
+         //start iterating over all available jobs
+         for(ScrapeJob job:jobs){
+             //iterate over all source the jobs shall performed on
+             for(Map.Entry<String,String> sourceEntry:job.getSourceConnections().entrySet()){
+                 LOGGER.debug("Register task for job {} for conn {} ({}) at rate {} ms",
+                     job.getJobName(),
+                     sourceEntry.getKey(),
+                     sourceEntry.getValue(),
+                     job.getScrapeRate());
+ 
+                 //create the regarding triggered scraper task
+                 TriggeredScraperTask triggeredScraperTask;
+                 try {
+                     triggeredScraperTask = new TriggeredScraperTask(
+                         driverManager,
+                         job.getJobName(),
+                         sourceEntry.getKey(),
+                         sourceEntry.getValue(),
+                         job.getFields(),
+                         futureTimeOut,
+                         executorService,
+                         resultHandler,
+                         (TriggeredScrapeJobImpl) job,
+                         triggerCollector);
  
+                     // Add task to internal list
+                     LOGGER.info("Task {} added to scheduling", triggeredScraperTask);
++                    registerTaskMBean(task);
+                     tasks.put(job, triggeredScraperTask);
+                     ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(triggeredScraperTask, 0, job.getScrapeRate(), TimeUnit.MILLISECONDS);
+ 
+                     // Store the handle for stopping, etc.
+                     scraperTaskMap.put(triggeredScraperTask, future);
+                 } catch (ScraperException e) {
+                     LOGGER.warn("Error executing the job {} for conn {} ({}) at rate {} ms",job.getJobName(), sourceEntry.getKey(), sourceEntry.getValue(), job.getScrapeRate(),e);
                  }
-             );
+             }
+ 
+         }
  
          // Add statistics tracker
          scheduler.scheduleAtFixedRate(() -> {
@@@ -182,19 -203,11 +232,18 @@@
          }, 1_000, 1_000, TimeUnit.MILLISECONDS);
      }
  
 -    @Override
 -    public int getNumberOfActiveTasks() {
 -        return 0;
 +    /**
 +     * Register a task as MBean
 +     * @param task task to register
 +     */
 +    private void registerTaskMBean(ScraperTask task) {
 +        try {
 +            mBeanServer.registerMBean(task, new ObjectName(MX_DOMAIN + ":type=ScrapeTask,name=" + task.getJobName() + "-" + task.getConnectionAlias()));
 +        } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
 +            LOGGER.debug("Unable to register Task as MBean", e);
 +        }
      }
  
- 
      @Override
      public void stop() {
          // Stop all futures
@@@ -204,19 -217,83 +253,96 @@@
              entry.getValue().cancel(true);
          }
          // Clear the map
-         futures.clear();
+         scraperTaskMap.clear();
+     }
+ 
+     /**
+      * acquires a plc connection from connection pool
+      * @param plcDriverManager  Driver manager handling connection and pools
+      * @param connectionString  Connection string as defined in the regarding implementation of {@link PlcDriver}
+      * @param executorService   ExecuterService holding a pool as threads handling requests and stuff
+      * @param requestTimeoutMs  maximum awaiting for the the future to return a result
+      * @param info              additional info for trace reasons
+      * @return the {@link PlcConnection} used for acquiring data from PLC endpoint
+      * @throws InterruptedException
+      * @throws ExecutionException
+      * @throws TimeoutException
+      */
+     public static PlcConnection getPlcConnection(PlcDriverManager plcDriverManager,
+                                                  String connectionString,
+                                                  ExecutorService executorService,
+                                                  long requestTimeoutMs,
+                                                  String info) throws InterruptedException, ExecutionException, TimeoutException {
+         if(!info.isEmpty() && LOGGER.isTraceEnabled()){
+             LOGGER.trace("Additional Info from caller {}", info);
+         }
+         CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
+             try {
+                 return plcDriverManager.getConnection(connectionString);
+             } catch (PlcConnectionException e) {
+                 LOGGER.warn("Unable to instantiate connection to " + connectionString, e);
+                 throw new PlcRuntimeException(e);
+             }
+             catch (Exception e){
+                 LOGGER.warn("Unable to instantiate connection to " + connectionString, e);
+                 throw new PlcRuntimeException(e);
+             }
+         }, executorService);
+         if(LOGGER.isTraceEnabled()){
+             LOGGER.trace("try to get a connection to {}", connectionString);
+         }
+         PlcConnection plcConnection=null;
+         try {
+             plcConnection = future.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+         }
+         catch (Exception e){
+             LOGGER.trace("Additional Info from caller {}", info,e);
+             throw e;
+         }
+         return plcConnection;
+     }
+ 
+     /**
+      * acquires a plc connection from connection pool
+      * @param plcDriverManager  Driver manager handling connection and pools
+      * @param connectionString  Connection string as defined in the regarding implementation of {@link PlcDriver}
+      * @param executorService   ExecuterService holding a pool as threads handling requests and stuff
+      * @param requestTimeoutMs  maximum awaiting for the the future to return a result
+      * @return the {@link PlcConnection} used for acquiring data from PLC endpoint
+      * @throws InterruptedException
+      * @throws ExecutionException
+      * @throws TimeoutException
+      */
+     public static PlcConnection getPlcConnection(PlcDriverManager plcDriverManager,
+                                                  String connectionString,
+                                                  ExecutorService executorService,
+                                                  long requestTimeoutMs) throws InterruptedException, ExecutionException, TimeoutException {
+         return getPlcConnection(plcDriverManager,connectionString,executorService,requestTimeoutMs,"");
+     }
+ 
+     /**
+      * transforms the results from a {@link PlcReadResponse} into a map
+      * @param plcReadResponse response that shall be converted to map for further processing
+      * @return the converted map
+      */
+     public static Map<String, Object> convertPlcResponseToMap(PlcReadResponse plcReadResponse) {
+         return plcReadResponse.getFieldNames().stream()
+             .collect(Collectors.toMap(
+                 name -> name,
+                 plcReadResponse::getObject
+             ));
      }
 +
 +
 +    // MBean methods
 +    @Override
 +    public boolean isRunning() {
 +        // TODO is this okay so?
 +        return !futures.isEmpty();
 +    }
 +
 +    @Override
 +    public int getNumberOfActiveTasks() {
 +        return (int) futures.entries().stream().filter(entry -> !entry.getValue().isDone()).count();
 +    }
  }
diff --cc plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
index 5cc7aaf,dc0d676..c0e888c
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
@@@ -89,10 -93,12 +91,13 @@@ public class TriggeredScraperTask imple
      @Override
      //ToDo code-refactoring and improved testing --> PLC4X-90
      public void run() {
 +        LOGGER.trace("Check condition for task of job {} for connection {}", jobName, connectionAlias);
          if(this.triggerHandler.checkTrigger()) {
              // Does a single fetch only when trigger is valid
-             LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+             LOGGER.info("Trigger for job {} and device {} is met ... scraping desired data",jobName,connectionAlias);
+             if(LOGGER.isTraceEnabled()) {
+                 LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+             }
              requestCounter.incrementAndGet();
              StopWatch stopWatch = new StopWatch();
              stopWatch.start();
@@@ -219,30 -228,17 +222,44 @@@
          return requestTimeoutMs;
      }
  
+     @Override
+     public String toString() {
+         return "TriggeredScraperTask{" +
+             "driverManager=" + driverManager +
+             ", jobName='" + jobName + '\'' +
+             ", connectionAlias='" + connectionAlias + '\'' +
+             ", connectionString='" + connectionString + '\'' +
+             ", requestTimeoutMs=" + requestTimeoutMs +
+             ", executorService=" + executorService +
+             ", resultHandler=" + resultHandler +
+             ", triggerHandler=" + triggerHandler +
+             '}';
+     }
++
 +    //---------------------------------
 +    // JMX Monitoring
 +    //---------------------------------
 +    @Override
 +    public long getScrapesTotal() {
 +        return requestCounter.get();
 +    }
 +
 +    @Override
 +    public long getScrapesSuccess() {
 +        return successCounter.get();
 +    }
 +
 +    @Override
 +    public double getPercentageFailed() {
 +        return 100.0 - (double)this.getScrapesSuccess()/this.getScrapesTotal() * 100.0;
 +    }
 +
 +    @Override
 +    public String[] getPercentiles() {
 +        String[] percentiles = new String[10];
 +        for (int i = 1; i <= 10; i += 1) {
 +            percentiles[i - 1] = String.format("%d%%: %s ms", 10 * i, latencyStatistics.getPercentile(10.0 * i) * 1e-6);
 +        }
 +        return percentiles;
 +    }
  }