You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by er...@apache.org on 2020/05/15 10:40:17 UTC
[plc4x] 02/08: Camel-Scraper
This is an automated email from the ASF dual-hosted git repository.
erobinet pushed a commit to branch feature/scraper
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 912c1effdfc6244b4b7a63f9d316df6ea2710a4d
Author: Etienne Robinet <et...@gmail.com>
AuthorDate: Tue May 12 08:54:56 2020 +0200
Camel-Scraper
---
plc4j/integrations/apache-camel/pom.xml | 18 ++++
.../org/apache/plc4x/camel/Plc4XComponent.java | 8 ++
.../java/org/apache/plc4x/camel/Plc4XConsumer.java | 99 ++++++++++++++++------
.../java/org/apache/plc4x/camel/Plc4XEndpoint.java | 59 ++++++++++---
.../main/java/org/apache/plc4x/camel/TagData.java | 12 +++
plc4j/tools/scraper/pom.xml | 14 +--
.../triggerhandler/TriggerConfiguration.java | 2 -
pom.xml | 2 +-
8 files changed, 163 insertions(+), 51 deletions(-)
diff --git a/plc4j/integrations/apache-camel/pom.xml b/plc4j/integrations/apache-camel/pom.xml
index cfa551a..85cd04f 100644
--- a/plc4j/integrations/apache-camel/pom.xml
+++ b/plc4j/integrations/apache-camel/pom.xml
@@ -128,6 +128,24 @@
<version>3.1.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-connection-pool</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-scraper</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-driver-eip</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<dependencyManagement>
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
index 82763ff..f7e705f 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
@@ -38,6 +38,14 @@ public class Plc4XComponent extends DefaultComponent {
if(tags!=null){
((Plc4XEndpoint)endpoint).setTags(tags);
}
+ String trigger = getAndRemoveOrResolveReferenceParameter(parameters,"trigger",String.class);
+ if(trigger!=null){
+ ((Plc4XEndpoint)endpoint).setTrigger(trigger);
+ }
+ int period = getAndRemoveOrResolveReferenceParameter(parameters,"period",Integer.class);
+ if(period!=0){
+ ((Plc4XEndpoint)endpoint).setPeriod(period);
+ }
setProperties(endpoint,parameters);
return endpoint;
}
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 18161db..132aed8 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -27,13 +27,19 @@ import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.scraper.ScrapeJob;
+import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
+import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.*;
public class Plc4XConsumer extends DefaultConsumer {
@@ -42,18 +48,25 @@ public class Plc4XConsumer extends DefaultConsumer {
private ExceptionHandler exceptionHandler;
private PlcConnection plcConnection;
private List<TagData> tags;
- private Map parameters;
+ private Map<String,String> fields;
+ private String trigger;
private PlcSubscriptionResponse subscriptionResponse;
private Plc4XEndpoint plc4XEndpoint;
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> future;
+ private final static String TRIGGER = "TRIGGER_VAR";
+ private final static String PLC_NAME = "PLC";
+
public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
super(endpoint, processor);
plc4XEndpoint =endpoint;
this.plcConnection = endpoint.getConnection();
this.tags = endpoint.getTags();
+ this.fields = TagData.toMap(this.tags);
+ this.trigger= endpoint.getTrigger();
+ plc4XEndpoint=endpoint;
}
@Override
@@ -76,31 +89,30 @@ public class Plc4XConsumer extends DefaultConsumer {
@Override
protected void doStart() throws InterruptedException, ExecutionException {
- PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
- if (tags.size()==1){
- TagData tag = tags.get(0);
- builder.addItem(tag.getTagName(),tag.getQuery());
-
- }
- else{
- for(TagData tag : tags){
- builder.addItem(tag.getTagName(),tag.getQuery());
- }
- }
- PlcReadRequest request = builder.build();
- future = executorService.schedule(() -> {
- request.execute().thenAccept(response -> {
+ if(trigger==null) {
+ PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
+ if (tags.size() == 1) {
+ TagData tag = tags.get(0);
+ builder.addItem(tag.getTagName(), tag.getQuery());
+
+ } else {
+ for (TagData tag : tags) {
+ builder.addItem(tag.getTagName(), tag.getQuery());
+ }
+ }
+ PlcReadRequest request = builder.build();
+ future = executorService.schedule(() -> {
+ request.execute().thenAccept(response -> {
try {
Exchange exchange = plc4XEndpoint.createExchange();
- if (tags.size()>1){
+ if (tags.size() > 1) {
List<TagData> values = new ArrayList<>();
- for(TagData tag : tags){
+ for (TagData tag : tags) {
tag.setValue(response.getObject(tag.getTagName()));
values.add(tag);
}
exchange.getIn().setBody(values);
- }
- else {
+ } else {
TagData tag = tags.get(0);
tag.setValue(response.getAllObjects(tag.getTagName()));
exchange.getIn().setBody(tag);
@@ -110,7 +122,46 @@ public class Plc4XConsumer extends DefaultConsumer {
exceptionHandler.handleException(e);
}
});
- }, 500, TimeUnit.MILLISECONDS);
+ }, 500, TimeUnit.MILLISECONDS);
+ }
+ else{
+
+ ScraperConfiguration configuration = getScraperConfig(TagData.toMap(plc4XEndpoint.getTags()));
+ TriggerCollector collector = new TriggerCollectorImpl(plc4XEndpoint.getPlcDriverManager());
+ try {
+ TriggeredScraperImpl scraper = new TriggeredScraperImpl(configuration, (job, alias, response) -> {
+ try {
+ Exchange exchange = plc4XEndpoint.createExchange();
+ if (tags.size() > 1) {
+ List<TagData> values = new ArrayList<>();
+ for (TagData tag : tags) {
+ tag.setValue(response.get(tag.getTagName()));
+ values.add(tag);
+ }
+ exchange.getIn().setBody(values);
+ } else {
+ TagData tag = tags.get(0);
+ tag.setValue(response.get(tag.getTagName()));
+ exchange.getIn().setBody(tag);
+ }
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exceptionHandler.handleException(e);
+ };
+ },collector);
+ scraper.start();
+ collector.start();
+ } catch (ScraperException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private ScraperConfigurationTriggeredImpl getScraperConfig(Map<String,String> tagList){
+ String config = "(TRIGGER_VAR,"+plc4XEndpoint.getPeriod()+",("+ plc4XEndpoint.getTrigger() +")==(true))";
+ List<JobConfigurationImpl> job = Collections.singletonList(new JobConfigurationImpl("PLC4X-Camel",config,0,Collections.singletonList(PLC_NAME),tagList));
+ Map<String,String> source = Collections.singletonMap(PLC_NAME,plc4XEndpoint.getUri());
+ return new ScraperConfigurationTriggeredImpl(source,job);
}
@Override
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
index c7e6a0e..5e493ec 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
@@ -27,7 +27,7 @@ import org.apache.camel.spi.UriPath;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.slf4j.LoggerFactory;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
import java.util.List;
import java.util.Map;
@@ -42,24 +42,61 @@ public class Plc4XEndpoint extends DefaultEndpoint {
@UriParam
private List<TagData> tags;
+ @UriParam
+ private String trigger;
+
+ @UriParam
+ private int period;
+
+ public int getPeriod() {
+ return period;
+ }
+
+ public void setPeriod(int period) {
+ this.period = period;
+ }
private final PlcDriverManager plcDriverManager;
private PlcConnection connection;
private String uri;
+ public String getUri() {
+ return uri;
+ }
+
+ public String getTrigger() {
+ return trigger;
+ }
+
+ public void setTrigger(String trigger) {
+ this.trigger = trigger;
+ }
+
public Plc4XEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
- plcDriverManager= new PlcDriverManager();
- uri = endpointUri;
-
- //Here we establish the connection in the endpoint, as it is created once during the context
- // to avoid disconnecting and reconnecting for every request
- try {
+ if(trigger==null) {
+ plcDriverManager = new PlcDriverManager();
+ uri = endpointUri;
+ //Here we establish the connection in the endpoint, as it is created once during the context
+ // to avoid disconnecting and reconnecting for every request
+ try {
+ String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
+ uri=plc4xURI;
+ connection = plcDriverManager.getConnection(plc4xURI);
+
+ } catch (PlcConnectionException e) {
+ e.printStackTrace();
+ }
+ }
+ else {
+ plcDriverManager = new PooledPlcDriverManager();
String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
- connection = plcDriverManager.getConnection(plc4xURI);
-
- } catch (PlcConnectionException e) {
- e.printStackTrace();
+ uri=plc4xURI;
+ try {
+ connection = plcDriverManager.getConnection(plc4xURI);
+ } catch (PlcConnectionException e) {
+ e.printStackTrace();
+ }
}
}
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java
index f57774f..c6b5d53 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java
@@ -18,7 +18,10 @@ under the License.
*/
package org.apache.plc4x.camel;
+import org.slf4j.LoggerFactory;
+
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
@@ -114,4 +117,13 @@ public class TagData {
}
+
+ public static Map<String,String> toMap(List<TagData> tags){
+ Map<String,String> map = new HashMap<>();
+ LoggerFactory.getLogger(TagData.class).info("Classloader {} ", Thread.currentThread().getContextClassLoader());
+ for(TagData tag : tags){
+ map.put(tag.getTagName(),tag.getQuery());
+ }
+ return map;
+ }
}
diff --git a/plc4j/tools/scraper/pom.xml b/plc4j/tools/scraper/pom.xml
index ce16973..61605f8 100644
--- a/plc4j/tools/scraper/pom.xml
+++ b/plc4j/tools/scraper/pom.xml
@@ -101,18 +101,7 @@
<version>0.7.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.plc4x</groupId>
- <artifactId>plc4j-driver-s7</artifactId>
- <version>0.7.0-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.plc4x</groupId>
- <artifactId>plc4j-driver-s7</artifactId>
- <version>0.7.0-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
+
</dependencies>
<build>
@@ -122,7 +111,6 @@
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<usedDependencies combine.children="append">
- <!--usedDependency>org.apache.plc4x:plc4j-driver-s7</usedDependency-->
</usedDependencies>
</configuration>
</plugin>
diff --git a/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java b/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
index 3390432..0f3ddc3 100644
--- a/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
+++ b/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
@@ -20,11 +20,9 @@
package org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler;
import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcDriver;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.model.PlcField;
-import org.apache.plc4x.java.s7.readwrite.field.S7Field;
import org.apache.plc4x.java.scraper.exception.ScraperConfigurationException;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
diff --git a/pom.xml b/pom.xml
index 6932577..1af28d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,7 +155,7 @@
<pcap4j.version>1.8.2</pcap4j.version>
<scala.version>2.12.6</scala.version>
<slf4j.version>1.7.25</slf4j.version>
- <snakeyaml.version>1.23</snakeyaml.version>
+ <snakeyaml.version>1.24</snakeyaml.version>
<spock-reports.version>1.6.1</spock-reports.version>
<spock.version>1.2-groovy-2.5</spock.version>
<t-digest.version>3.2</t-digest.version>