You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by tm...@apache.org on 2019/05/08 12:10:46 UTC
[plc4x] branch scraper_refactoring_and_improvement updated: fixed a
bug when scraping multiple sources
This is an automated email from the ASF dual-hosted git repository.
tmitsch pushed a commit to branch scraper_refactoring_and_improvement
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/scraper_refactoring_and_improvement by this push:
new 7c03131 fixed a bug when scraping multiple sources
7c03131 is described below
commit 7c031314e0064ece85bcaa9b1fd3f4f81bddd0b1
Author: Tim Mitsch <t....@pragmaticindustries.de>
AuthorDate: Wed May 8 14:10:40 2019 +0200
fixed a bug when scraping multiple sources
---
.../triggeredscraper/TriggeredScraperImpl.java | 38 ++++++++++++++++++++--
.../triggeredscraper/TriggeredScraperTask.java | 9 +++--
.../triggerhandler/TriggerConfiguration.java | 18 +++++++++-
.../triggerhandler/TriggerHandlerImpl.java | 1 +
.../collector/TriggerCollectorImpl.java | 22 +++++++++----
5 files changed, 77 insertions(+), 11 deletions(-)
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
index 7de9e56..4b76214 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
@@ -226,6 +226,7 @@ public class TriggeredScraperImpl implements Scraper {
* @param connectionString Connection string as defined in the regarding implementation of {@link PlcDriver}
* @param executorService ExecuterService holding a pool as threads handling requests and stuff
* @param requestTimeoutMs maximum awaiting for the the future to return a result
+ * @param info additional info for trace reasons
* @return the {@link PlcConnection} used for acquiring data from PLC endpoint
* @throws InterruptedException
* @throws ExecutionException
@@ -234,7 +235,11 @@ public class TriggeredScraperImpl implements Scraper {
public static PlcConnection getPlcConnection(PlcDriverManager plcDriverManager,
String connectionString,
ExecutorService executorService,
- long requestTimeoutMs) throws InterruptedException, ExecutionException, TimeoutException {
+ long requestTimeoutMs,
+ String info) throws InterruptedException, ExecutionException, TimeoutException {
+ if(!info.isEmpty() && LOGGER.isTraceEnabled()){
+ LOGGER.trace("Additional Info from caller {}", info);
+ }
CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
try {
return plcDriverManager.getConnection(connectionString);
@@ -247,7 +252,36 @@ public class TriggeredScraperImpl implements Scraper {
throw new PlcRuntimeException(e);
}
}, executorService);
- return future.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+ if(LOGGER.isTraceEnabled()){
+ LOGGER.trace("try to get a connection to {}", connectionString);
+ }
+ PlcConnection plcConnection=null;
+ try {
+ plcConnection = future.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e){
+ LOGGER.trace("Additional Info from caller {}", info,e);
+ throw e;
+ }
+ return plcConnection;
+ }
+
+ /**
+ * acquires a plc connection from connection pool
+ * @param plcDriverManager Driver manager handling connection and pools
+ * @param connectionString Connection string as defined in the regarding implementation of {@link PlcDriver}
+ * @param executorService ExecuterService holding a pool as threads handling requests and stuff
+ * @param requestTimeoutMs maximum awaiting for the the future to return a result
+ * @return the {@link PlcConnection} used for acquiring data from PLC endpoint
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ public static PlcConnection getPlcConnection(PlcDriverManager plcDriverManager,
+ String connectionString,
+ ExecutorService executorService,
+ long requestTimeoutMs) throws InterruptedException, ExecutionException, TimeoutException {
+ return getPlcConnection(plcDriverManager,connectionString,executorService,requestTimeoutMs,"");
}
/**
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
index cb037a6..dc0d676 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
@@ -95,6 +95,7 @@ public class TriggeredScraperTask implements ScraperTask {
public void run() {
if(this.triggerHandler.checkTrigger()) {
// Does a single fetch only when trigger is valid
+ LOGGER.info("Trigger for job {} and device {} is met ... scraping desired data",jobName,connectionAlias);
if(LOGGER.isTraceEnabled()) {
LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
}
@@ -103,7 +104,12 @@ public class TriggeredScraperTask implements ScraperTask {
stopWatch.start();
PlcConnection connection = null;
try {
- connection = TriggeredScraperImpl.getPlcConnection(driverManager,connectionString,executorService,requestTimeoutMs);
+ String info = "";
+ if(LOGGER.isTraceEnabled()) {
+ info = String.format("acquiring data collecting connection to (%s,%s)", connectionAlias,jobName);
+ LOGGER.trace("acquiring data collecting connection to ({},{})", connectionAlias,jobName);
+ }
+ connection = TriggeredScraperImpl.getPlcConnection(driverManager,connectionString,executorService,requestTimeoutMs,info);
if(LOGGER.isTraceEnabled()) {
LOGGER.trace("Connection to {} established: {}", connectionString, connection);
}
@@ -229,7 +235,6 @@ public class TriggeredScraperTask implements ScraperTask {
", jobName='" + jobName + '\'' +
", connectionAlias='" + connectionAlias + '\'' +
", connectionString='" + connectionString + '\'' +
- ", fields=" + fields +
", requestTimeoutMs=" + requestTimeoutMs +
", executorService=" + executorService +
", resultHandler=" + resultHandler +
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
index 9394e8b..0e74682 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
@@ -198,7 +198,11 @@ public class TriggerConfiguration{
boolean evaluateTrigger() throws ScraperException {
List<Boolean> triggerResultList = new ArrayList<>();
if(logger.isTraceEnabled()){
- logger.trace("eval values: {}",acquiredValuesList);
+ String connString = "empty";
+ if(!triggerElementList.isEmpty()) {
+ connString = triggerElementList.get(0).getPlcConnectionString();
+ }
+ logger.trace("eval values for job {} and {}: {}",triggeredScrapeJobImpl.getJobName(),connString,acquiredValuesList);
}
for(int countElements=0; countElements<acquiredValuesList.size();countElements++){
TriggerElement triggerElement = triggerElementList.get(countElements);
@@ -486,6 +490,8 @@ public class TriggerConfiguration{
private PlcField plcField;
private String plcFieldString;
+ private String plcConnectionString;
+
private String uuid;
private String triggerJob;
@@ -501,6 +507,7 @@ public class TriggerConfiguration{
this.plcField = null;
this.plcFieldString = null;
this.reservedCompareValue = null;
+ this.plcConnectionString="not defined";
this.triggerJob = "Not yet defined";
this.uuid = "";
}
@@ -524,6 +531,7 @@ public class TriggerConfiguration{
TriggerElement(String comparator, String concatType, String compareValue, String plcField, String triggerStrategy) throws ScraperException {
this();
this.plcFieldString = plcField;
+ this.plcConnectionString = plcConnectionString;
if(triggerStrategy.equals(S_7_TRIGGER_VAR)){
try {
this.plcField = S7Field.of(this.plcFieldString);
@@ -693,6 +701,14 @@ public class TriggerConfiguration{
}
}
+ public String getPlcConnectionString() {
+ return plcConnectionString;
+ }
+
+ public void setPlcConnectionString(String plcConnectionString) {
+ this.plcConnectionString = plcConnectionString;
+ }
+
public String getUuid() {
return uuid;
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java
index 432b575..6a2c551 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java
@@ -45,6 +45,7 @@ public class TriggerHandlerImpl implements TriggerHandler {
//transmit needed trigger to triggerCollection
for(TriggerConfiguration.TriggerElement triggerElement:triggerConfiguration.getTriggerElementList()){
+ triggerElement.setPlcConnectionString(parentScraperTask.getConnectionString());
triggerElement.setUuid(triggerCollector.submitTrigger(triggerElement.getPlcFieldString(),parentScraperTask.getConnectionString(),this.triggerConfiguration.getScrapeInterval()));
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java
index 113eca7..e4be70a 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java
@@ -127,6 +127,7 @@ public class TriggerCollectorImpl implements TriggerCollector {
Map<String, PlcReadRequest.Builder> plcReadRequestBuilderMap = new HashMap<>();
Map<String, PlcReadResponse> plcReadResponseMap = new HashMap<>();
List<RequestElement> activeRequestElements = new ArrayList<>();
+ List<PlcConnection> plcConnectionList = new ArrayList<>();
PlcConnection plcConnection=null;
for(Map.Entry<String,RequestElement> entry:currentRequestElements.entrySet()){
if(entry.getValue().getLastAcquirement().isBefore(
@@ -136,7 +137,13 @@ public class TriggerCollectorImpl implements TriggerCollector {
String plcConnectionString = entry.getValue().plcConnectionString;
if(!plcReadRequestBuilderMap.containsKey(plcConnectionString)){
try {
- plcConnection = TriggeredScraperImpl.getPlcConnection(plcDriverManager,plcConnectionString,executorService,futureTimeout);
+ String info = "";
+ if(logger.isTraceEnabled()) {
+ info = String.format("acquiring trigger connection to (%s)", plcConnectionString);
+ logger.trace("acquiring trigger connection to ({})", plcConnectionString);
+ }
+ plcConnection = TriggeredScraperImpl.getPlcConnection(plcDriverManager,plcConnectionString,executorService,futureTimeout,info);
+ plcConnectionList.add(plcConnection);
plcReadRequestBuilderMap.put(plcConnectionString,plcConnection.readRequestBuilder());
plcReadRequestBuilderMap.get(plcConnectionString).addItem(entry.getKey(),entry.getValue().getPlcField());
activeRequestElements.add(entry.getValue());
@@ -175,13 +182,16 @@ public class TriggerCollectorImpl implements TriggerCollector {
requestElement.setResult(plcReadResponseMap.get(requestElement.getPlcConnectionString()).getObject(requestElement.getUuid()));
requestElement.setLastAcquirement(currentTime);
}
- if(plcConnection!=null){
- try {
- plcConnection.close();
- } catch (Exception e) {
- logger.warn("Could not close connectiom ...");
+ for(PlcConnection plcConnectionFromList:plcConnectionList){
+ if(plcConnectionFromList!=null){
+ try {
+ plcConnectionFromList.close();
+ } catch (Exception e) {
+ logger.warn("Could not close connection ...");
+ }
}
}
+
}
/**