You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by er...@apache.org on 2020/05/15 10:40:17 UTC

[plc4x] 02/08: Camel-Scraper

This is an automated email from the ASF dual-hosted git repository.

erobinet pushed a commit to branch feature/scraper
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 912c1effdfc6244b4b7a63f9d316df6ea2710a4d
Author: Etienne Robinet <et...@gmail.com>
AuthorDate: Tue May 12 08:54:56 2020 +0200

    Camel-Scraper
---
 plc4j/integrations/apache-camel/pom.xml            | 18 ++++
 .../org/apache/plc4x/camel/Plc4XComponent.java     |  8 ++
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java | 99 ++++++++++++++++------
 .../java/org/apache/plc4x/camel/Plc4XEndpoint.java | 59 ++++++++++---
 .../main/java/org/apache/plc4x/camel/TagData.java  | 12 +++
 plc4j/tools/scraper/pom.xml                        | 14 +--
 .../triggerhandler/TriggerConfiguration.java       |  2 -
 pom.xml                                            |  2 +-
 8 files changed, 163 insertions(+), 51 deletions(-)

diff --git a/plc4j/integrations/apache-camel/pom.xml b/plc4j/integrations/apache-camel/pom.xml
index cfa551a..85cd04f 100644
--- a/plc4j/integrations/apache-camel/pom.xml
+++ b/plc4j/integrations/apache-camel/pom.xml
@@ -128,6 +128,24 @@
       <version>3.1.0</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-connection-pool</artifactId>
+      <version>0.7.0-SNAPSHOT</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-scraper</artifactId>
+      <version>0.7.0-SNAPSHOT</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-driver-eip</artifactId>
+      <version>0.7.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <dependencyManagement>
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
index 82763ff..f7e705f 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
@@ -38,6 +38,14 @@ public class Plc4XComponent extends DefaultComponent {
         if(tags!=null){
             ((Plc4XEndpoint)endpoint).setTags(tags);
         }
+        String trigger = getAndRemoveOrResolveReferenceParameter(parameters,"trigger",String.class);
+        if(trigger!=null){
+            ((Plc4XEndpoint)endpoint).setTrigger(trigger);
+        }
+        int period = getAndRemoveOrResolveReferenceParameter(parameters,"period",Integer.class);
+        if(period!=0){
+            ((Plc4XEndpoint)endpoint).setPeriod(period);
+        }
         setProperties(endpoint,parameters);
         return endpoint;
     }
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 18161db..132aed8 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -27,13 +27,19 @@ import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.scraper.ScrapeJob;
+import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
+import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.*;
 
 public class Plc4XConsumer extends DefaultConsumer {
@@ -42,18 +48,25 @@ public class Plc4XConsumer extends DefaultConsumer {
     private ExceptionHandler exceptionHandler;
     private PlcConnection plcConnection;
     private  List<TagData> tags;
-    private Map parameters;
+    private  Map<String,String> fields;
+    private String trigger;
     private PlcSubscriptionResponse subscriptionResponse;
     private Plc4XEndpoint plc4XEndpoint;
 
     private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
     private ScheduledFuture<?> future;
 
+    private final static String TRIGGER = "TRIGGER_VAR";
+    private final static String PLC_NAME = "PLC";
+
     public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
         super(endpoint, processor);
         plc4XEndpoint =endpoint;
         this.plcConnection = endpoint.getConnection();
         this.tags = endpoint.getTags();
+        this.fields = TagData.toMap(this.tags);
+        this.trigger= endpoint.getTrigger();
+        plc4XEndpoint=endpoint;
     }
 
     @Override
@@ -76,31 +89,30 @@ public class Plc4XConsumer extends DefaultConsumer {
 
     @Override
     protected void doStart() throws InterruptedException, ExecutionException {
-        PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
-        if (tags.size()==1){
-            TagData tag = tags.get(0);
-            builder.addItem(tag.getTagName(),tag.getQuery());
-
-        }
-        else{
-           for(TagData tag : tags){
-               builder.addItem(tag.getTagName(),tag.getQuery());
-           }
-        }
-        PlcReadRequest request = builder.build();
-        future = executorService.schedule(() -> {
-            request.execute().thenAccept(response -> {
+        if(trigger==null) {
+            PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
+            if (tags.size() == 1) {
+                TagData tag = tags.get(0);
+                builder.addItem(tag.getTagName(), tag.getQuery());
+
+            } else {
+                for (TagData tag : tags) {
+                    builder.addItem(tag.getTagName(), tag.getQuery());
+                }
+            }
+            PlcReadRequest request = builder.build();
+            future = executorService.schedule(() -> {
+                request.execute().thenAccept(response -> {
                     try {
                         Exchange exchange = plc4XEndpoint.createExchange();
-                        if (tags.size()>1){
+                        if (tags.size() > 1) {
                             List<TagData> values = new ArrayList<>();
-                            for(TagData tag : tags){
+                            for (TagData tag : tags) {
                                 tag.setValue(response.getObject(tag.getTagName()));
                                 values.add(tag);
                             }
                             exchange.getIn().setBody(values);
-                        }
-                        else {
+                        } else {
                             TagData tag = tags.get(0);
                             tag.setValue(response.getAllObjects(tag.getTagName()));
                             exchange.getIn().setBody(tag);
@@ -110,7 +122,46 @@ public class Plc4XConsumer extends DefaultConsumer {
                         exceptionHandler.handleException(e);
                     }
                 });
-        }, 500, TimeUnit.MILLISECONDS);
+            }, 500, TimeUnit.MILLISECONDS);
+        }
+        else{
+
+            ScraperConfiguration configuration =  getScraperConfig(TagData.toMap(plc4XEndpoint.getTags()));
+            TriggerCollector collector = new TriggerCollectorImpl(plc4XEndpoint.getPlcDriverManager());
+            try {
+                TriggeredScraperImpl scraper = new TriggeredScraperImpl(configuration, (job, alias, response) -> {
+                    try {
+                        Exchange exchange = plc4XEndpoint.createExchange();
+                        if (tags.size() > 1) {
+                            List<TagData> values = new ArrayList<>();
+                            for (TagData tag : tags) {
+                                tag.setValue(response.get(tag.getTagName()));
+                                values.add(tag);
+                            }
+                            exchange.getIn().setBody(values);
+                        } else {
+                            TagData tag = tags.get(0);
+                            tag.setValue(response.get(tag.getTagName()));
+                            exchange.getIn().setBody(tag);
+                        }
+                        getProcessor().process(exchange);
+                    } catch (Exception e) {
+                        exceptionHandler.handleException(e);
+                    };
+                    },collector);
+                scraper.start();
+                collector.start();
+            } catch (ScraperException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private ScraperConfigurationTriggeredImpl getScraperConfig(Map<String,String> tagList){
+        String config = "(TRIGGER_VAR,"+plc4XEndpoint.getPeriod()+",("+ plc4XEndpoint.getTrigger() +")==(true))";
+        List<JobConfigurationImpl> job = Collections.singletonList(new JobConfigurationImpl("PLC4X-Camel",config,0,Collections.singletonList(PLC_NAME),tagList));
+        Map<String,String> source = Collections.singletonMap(PLC_NAME,plc4XEndpoint.getUri());
+        return new ScraperConfigurationTriggeredImpl(source,job);
     }
 
     @Override
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
index c7e6a0e..5e493ec 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
@@ -27,7 +27,7 @@ import org.apache.camel.spi.UriPath;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.slf4j.LoggerFactory;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 
 import java.util.List;
 import java.util.Map;
@@ -42,24 +42,61 @@ public class Plc4XEndpoint extends DefaultEndpoint {
     @UriParam
     private List<TagData> tags;
 
+    @UriParam
+    private String trigger;
+
+    @UriParam
+    private int period;
+
+    public int getPeriod() {
+        return period;
+    }
+
+    public void setPeriod(int period) {
+        this.period = period;
+    }
 
     private final PlcDriverManager plcDriverManager;
     private  PlcConnection connection;
     private String uri;
 
+    public String getUri() {
+        return uri;
+    }
+
+    public String getTrigger() {
+        return trigger;
+    }
+
+    public void setTrigger(String trigger) {
+        this.trigger = trigger;
+    }
+
     public Plc4XEndpoint(String endpointUri, Component component) {
         super(endpointUri, component);
-        plcDriverManager= new PlcDriverManager();
-        uri = endpointUri;
-
-        //Here we establish the connection in the endpoint, as it is created once during the context
-        // to avoid disconnecting and reconnecting for every request
-        try {
+        if(trigger==null) {
+            plcDriverManager = new PlcDriverManager();
+            uri = endpointUri;
+            //Here we establish the connection in the endpoint, as it is created once during the context
+            // to avoid disconnecting and reconnecting for every request
+            try {
+                String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
+                uri=plc4xURI;
+                connection = plcDriverManager.getConnection(plc4xURI);
+
+            } catch (PlcConnectionException e) {
+                e.printStackTrace();
+            }
+        }
+        else {
+            plcDriverManager = new PooledPlcDriverManager();
             String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
-            connection = plcDriverManager.getConnection(plc4xURI);
-
-        } catch (PlcConnectionException e) {
-            e.printStackTrace();
+            uri=plc4xURI;
+            try {
+                connection = plcDriverManager.getConnection(plc4xURI);
+            } catch (PlcConnectionException e) {
+                e.printStackTrace();
+            }
         }
     }
 
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java
index f57774f..c6b5d53 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java
@@ -18,7 +18,10 @@ under the License.
 */
 package org.apache.plc4x.camel;
 
+import org.slf4j.LoggerFactory;
+
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Predicate;
 
@@ -114,4 +117,13 @@ public class TagData {
 
 
     }
+
+    public static Map<String,String> toMap(List<TagData> tags){
+        Map<String,String> map = new HashMap<>();
+        LoggerFactory.getLogger(TagData.class).info("Classloader {} ", Thread.currentThread().getContextClassLoader());
+        for(TagData tag : tags){
+            map.put(tag.getTagName(),tag.getQuery());
+        }
+        return map;
+    }
 }
diff --git a/plc4j/tools/scraper/pom.xml b/plc4j/tools/scraper/pom.xml
index ce16973..61605f8 100644
--- a/plc4j/tools/scraper/pom.xml
+++ b/plc4j/tools/scraper/pom.xml
@@ -101,18 +101,7 @@
       <version>0.7.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
-      <dependency>
-          <groupId>org.apache.plc4x</groupId>
-          <artifactId>plc4j-driver-s7</artifactId>
-          <version>0.7.0-SNAPSHOT</version>
-          <scope>compile</scope>
-      </dependency>
-      <dependency>
-          <groupId>org.apache.plc4x</groupId>
-          <artifactId>plc4j-driver-s7</artifactId>
-          <version>0.7.0-SNAPSHOT</version>
-          <scope>compile</scope>
-      </dependency>
+
   </dependencies>
 
   <build>
@@ -122,7 +111,6 @@
         <artifactId>maven-dependency-plugin</artifactId>
         <configuration>
           <usedDependencies combine.children="append">
-            <!--usedDependency>org.apache.plc4x:plc4j-driver-s7</usedDependency-->
           </usedDependencies>
         </configuration>
       </plugin>
diff --git a/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java b/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
index 3390432..0f3ddc3 100644
--- a/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
+++ b/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
@@ -20,11 +20,9 @@
 package org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler;
 
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.PlcDriver;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.model.PlcField;
-import org.apache.plc4x.java.s7.readwrite.field.S7Field;
 import org.apache.plc4x.java.scraper.exception.ScraperConfigurationException;
 import org.apache.plc4x.java.scraper.exception.ScraperException;
 import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
diff --git a/pom.xml b/pom.xml
index 6932577..1af28d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,7 +155,7 @@
     <pcap4j.version>1.8.2</pcap4j.version>
     <scala.version>2.12.6</scala.version>
     <slf4j.version>1.7.25</slf4j.version>
-    <snakeyaml.version>1.23</snakeyaml.version>
+    <snakeyaml.version>1.24</snakeyaml.version>
     <spock-reports.version>1.6.1</spock-reports.version>
     <spock.version>1.2-groovy-2.5</spock.version>
     <t-digest.version>3.2</t-digest.version>