You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by tm...@apache.org on 2019/05/08 12:10:46 UTC

[plc4x] branch scraper_refactoring_and_improvement updated: fixed a bug when scraping multiple sources

This is an automated email from the ASF dual-hosted git repository.

tmitsch pushed a commit to branch scraper_refactoring_and_improvement
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/scraper_refactoring_and_improvement by this push:
     new 7c03131  fixed a bug when scraping multiple sources
7c03131 is described below

commit 7c031314e0064ece85bcaa9b1fd3f4f81bddd0b1
Author: Tim Mitsch <t....@pragmaticindustries.de>
AuthorDate: Wed May 8 14:10:40 2019 +0200

    fixed a bug when scraping multiple sources
---
 .../triggeredscraper/TriggeredScraperImpl.java     | 38 ++++++++++++++++++++--
 .../triggeredscraper/TriggeredScraperTask.java     |  9 +++--
 .../triggerhandler/TriggerConfiguration.java       | 18 +++++++++-
 .../triggerhandler/TriggerHandlerImpl.java         |  1 +
 .../collector/TriggerCollectorImpl.java            | 22 +++++++++----
 5 files changed, 77 insertions(+), 11 deletions(-)

diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
index 7de9e56..4b76214 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
@@ -226,6 +226,7 @@ public class TriggeredScraperImpl implements Scraper {
      * @param connectionString  Connection string as defined in the regarding implementation of {@link PlcDriver}
      * @param executorService   ExecuterService holding a pool as threads handling requests and stuff
      * @param requestTimeoutMs  maximum awaiting for the the future to return a result
+     * @param info              additional info for trace reasons
      * @return the {@link PlcConnection} used for acquiring data from PLC endpoint
      * @throws InterruptedException
      * @throws ExecutionException
@@ -234,7 +235,11 @@ public class TriggeredScraperImpl implements Scraper {
     public static PlcConnection getPlcConnection(PlcDriverManager plcDriverManager,
                                                  String connectionString,
                                                  ExecutorService executorService,
-                                                 long requestTimeoutMs) throws InterruptedException, ExecutionException, TimeoutException {
+                                                 long requestTimeoutMs,
+                                                 String info) throws InterruptedException, ExecutionException, TimeoutException {
+        if(!info.isEmpty() && LOGGER.isTraceEnabled()){
+            LOGGER.trace("Additional Info from caller {}", info);
+        }
         CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
             try {
                 return plcDriverManager.getConnection(connectionString);
@@ -247,7 +252,36 @@ public class TriggeredScraperImpl implements Scraper {
                 throw new PlcRuntimeException(e);
             }
         }, executorService);
-        return future.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+        if(LOGGER.isTraceEnabled()){
+            LOGGER.trace("try to get a connection to {}", connectionString);
+        }
+        PlcConnection plcConnection=null;
+        try {
+            plcConnection = future.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+        }
+        catch (Exception e){
+            LOGGER.trace("Additional Info from caller {}", info,e);
+            throw e;
+        }
+        return plcConnection;
+    }
+
+    /**
+     * acquires a plc connection from connection pool
+     * @param plcDriverManager  Driver manager handling connection and pools
+     * @param connectionString  Connection string as defined in the regarding implementation of {@link PlcDriver}
+     * @param executorService   ExecuterService holding a pool as threads handling requests and stuff
+     * @param requestTimeoutMs  maximum awaiting for the the future to return a result
+     * @return the {@link PlcConnection} used for acquiring data from PLC endpoint
+     * @throws InterruptedException
+     * @throws ExecutionException
+     * @throws TimeoutException
+     */
+    public static PlcConnection getPlcConnection(PlcDriverManager plcDriverManager,
+                                                 String connectionString,
+                                                 ExecutorService executorService,
+                                                 long requestTimeoutMs) throws InterruptedException, ExecutionException, TimeoutException {
+        return getPlcConnection(plcDriverManager,connectionString,executorService,requestTimeoutMs,"");
     }
 
     /**
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
index cb037a6..dc0d676 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
@@ -95,6 +95,7 @@ public class TriggeredScraperTask implements ScraperTask {
     public void run() {
         if(this.triggerHandler.checkTrigger()) {
             // Does a single fetch only when trigger is valid
+            LOGGER.info("Trigger for job {} and device {} is met ... scraping desired data",jobName,connectionAlias);
             if(LOGGER.isTraceEnabled()) {
                 LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
             }
@@ -103,7 +104,12 @@ public class TriggeredScraperTask implements ScraperTask {
             stopWatch.start();
             PlcConnection connection = null;
             try {
-                connection = TriggeredScraperImpl.getPlcConnection(driverManager,connectionString,executorService,requestTimeoutMs);
+                String info = "";
+                if(LOGGER.isTraceEnabled()) {
+                    info = String.format("acquiring data collecting connection to (%s,%s)", connectionAlias,jobName);
+                    LOGGER.trace("acquiring data collecting connection to ({},{})", connectionAlias,jobName);
+                }
+                connection = TriggeredScraperImpl.getPlcConnection(driverManager,connectionString,executorService,requestTimeoutMs,info);
                 if(LOGGER.isTraceEnabled()) {
                     LOGGER.trace("Connection to {} established: {}", connectionString, connection);
                 }
@@ -229,7 +235,6 @@ public class TriggeredScraperTask implements ScraperTask {
             ", jobName='" + jobName + '\'' +
             ", connectionAlias='" + connectionAlias + '\'' +
             ", connectionString='" + connectionString + '\'' +
-            ", fields=" + fields +
             ", requestTimeoutMs=" + requestTimeoutMs +
             ", executorService=" + executorService +
             ", resultHandler=" + resultHandler +
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
index 9394e8b..0e74682 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
@@ -198,7 +198,11 @@ public class TriggerConfiguration{
         boolean evaluateTrigger() throws ScraperException {
             List<Boolean> triggerResultList = new ArrayList<>();
             if(logger.isTraceEnabled()){
-                logger.trace("eval values: {}",acquiredValuesList);
+                String connString = "empty";
+                if(!triggerElementList.isEmpty()) {
+                    connString = triggerElementList.get(0).getPlcConnectionString();
+                }
+                logger.trace("eval values for job {} and {}: {}",triggeredScrapeJobImpl.getJobName(),connString,acquiredValuesList);
             }
             for(int countElements=0; countElements<acquiredValuesList.size();countElements++){
                 TriggerElement triggerElement = triggerElementList.get(countElements);
@@ -486,6 +490,8 @@ public class TriggerConfiguration{
         private PlcField plcField;
         private String plcFieldString;
 
+        private String plcConnectionString;
+
         private String uuid;
 
         private String triggerJob;
@@ -501,6 +507,7 @@ public class TriggerConfiguration{
             this.plcField = null;
             this.plcFieldString = null;
             this.reservedCompareValue = null;
+            this.plcConnectionString="not defined";
             this.triggerJob = "Not yet defined";
             this.uuid = "";
         }
@@ -524,6 +531,7 @@ public class TriggerConfiguration{
         TriggerElement(String comparator, String concatType, String compareValue, String plcField, String triggerStrategy) throws ScraperException {
             this();
             this.plcFieldString = plcField;
+            this.plcConnectionString = plcConnectionString;
             if(triggerStrategy.equals(S_7_TRIGGER_VAR)){
                 try {
                     this.plcField = S7Field.of(this.plcFieldString);
@@ -693,6 +701,14 @@ public class TriggerConfiguration{
             }
         }
 
+        public String getPlcConnectionString() {
+            return plcConnectionString;
+        }
+
+        public void setPlcConnectionString(String plcConnectionString) {
+            this.plcConnectionString = plcConnectionString;
+        }
+
         public String getUuid() {
             return uuid;
         }
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java
index 432b575..6a2c551 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java
@@ -45,6 +45,7 @@ public class TriggerHandlerImpl implements TriggerHandler {
 
         //transmit needed trigger to triggerCollection
         for(TriggerConfiguration.TriggerElement triggerElement:triggerConfiguration.getTriggerElementList()){
+            triggerElement.setPlcConnectionString(parentScraperTask.getConnectionString());
             triggerElement.setUuid(triggerCollector.submitTrigger(triggerElement.getPlcFieldString(),parentScraperTask.getConnectionString(),this.triggerConfiguration.getScrapeInterval()));
         }
 
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java
index 113eca7..e4be70a 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java
@@ -127,6 +127,7 @@ public class TriggerCollectorImpl implements TriggerCollector {
         Map<String, PlcReadRequest.Builder> plcReadRequestBuilderMap = new HashMap<>();
         Map<String, PlcReadResponse> plcReadResponseMap = new HashMap<>();
         List<RequestElement> activeRequestElements = new ArrayList<>();
+        List<PlcConnection> plcConnectionList = new ArrayList<>();
         PlcConnection plcConnection=null;
         for(Map.Entry<String,RequestElement> entry:currentRequestElements.entrySet()){
             if(entry.getValue().getLastAcquirement().isBefore(
@@ -136,7 +137,13 @@ public class TriggerCollectorImpl implements TriggerCollector {
                 String plcConnectionString = entry.getValue().plcConnectionString;
                 if(!plcReadRequestBuilderMap.containsKey(plcConnectionString)){
                     try {
-                        plcConnection = TriggeredScraperImpl.getPlcConnection(plcDriverManager,plcConnectionString,executorService,futureTimeout);
+                        String info = "";
+                        if(logger.isTraceEnabled()) {
+                            info = String.format("acquiring trigger connection to (%s)", plcConnectionString);
+                            logger.trace("acquiring trigger connection to ({})", plcConnectionString);
+                        }
+                        plcConnection = TriggeredScraperImpl.getPlcConnection(plcDriverManager,plcConnectionString,executorService,futureTimeout,info);
+                        plcConnectionList.add(plcConnection);
                         plcReadRequestBuilderMap.put(plcConnectionString,plcConnection.readRequestBuilder());
                         plcReadRequestBuilderMap.get(plcConnectionString).addItem(entry.getKey(),entry.getValue().getPlcField());
                         activeRequestElements.add(entry.getValue());
@@ -175,13 +182,16 @@ public class TriggerCollectorImpl implements TriggerCollector {
             requestElement.setResult(plcReadResponseMap.get(requestElement.getPlcConnectionString()).getObject(requestElement.getUuid()));
             requestElement.setLastAcquirement(currentTime);
         }
-        if(plcConnection!=null){
-            try {
-                plcConnection.close();
-            } catch (Exception e) {
-                logger.warn("Could not close connectiom ...");
+        for(PlcConnection plcConnectionFromList:plcConnectionList){
+            if(plcConnectionFromList!=null){
+                try {
+                    plcConnectionFromList.close();
+                } catch (Exception e) {
+                    logger.warn("Could not close connection ...");
+                }
             }
         }
+
     }
 
     /**