You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by er...@apache.org on 2020/05/15 10:40:20 UTC

[plc4x] 05/08: Camel using Scraper -trigger and period parameters to use Triggered Scraper

This is an automated email from the ASF dual-hosted git repository.

erobinet pushed a commit to branch feature/scraper
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 8a1db07fa1a845a52339ed37676f4a9737712438
Merge: e5d3cdd 2dbaa94
Author: Etienne Robinet <et...@gmail.com>
AuthorDate: Tue May 12 14:03:37 2020 +0200

    Camel using Scraper
    -trigger and period parameters to use Triggered Scraper

 .../org/apache/plc4x/java/mock/MockDriver.java     |  6 ++
 .../apache/plc4x/java/opcua/OpcuaPlcDriver.java    |  1 -
 .../src/main/resources/config.yaml                 | 10 ---
 plc4j/integrations/apache-camel/pom.xml            |  6 ++
 .../java/org/apache/plc4x/camel/Constants.java     |  3 +-
 .../org/apache/plc4x/camel/Plc4XComponent.java     | 14 ++--
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java | 74 ++++++++---------
 .../java/org/apache/plc4x/camel/Plc4XEndpoint.java | 94 ++++++++++------------
 .../apache/plc4x/camel/Plc4XPollingConsumer.java   | 27 +++++--
 .../java/org/apache/plc4x/camel/Plc4XProducer.java | 26 +++---
 .../org/apache/plc4x/camel/Plc4XComponentTest.java | 10 +--
 plc4j/karaf-features/camel/pom.xml                 |  2 +-
 .../triggeredscraper/TriggeredScraperImplTest.java |  3 +-
 .../test/resources/example_triggered_scraper.yml   |  8 +-
 .../src/test/resources/mock-scraper-config.yml     |  4 +-
 .../asciidoc/developers/release/validation.adoc    |  5 ++
 16 files changed, 149 insertions(+), 144 deletions(-)

diff --cc plc4j/drivers/mock/src/main/java/org/apache/plc4x/java/mock/MockDriver.java
index 3878fa0,3878fa0..e04a55d
--- a/plc4j/drivers/mock/src/main/java/org/apache/plc4x/java/mock/MockDriver.java
+++ b/plc4j/drivers/mock/src/main/java/org/apache/plc4x/java/mock/MockDriver.java
@@@ -23,6 -23,6 +23,7 @@@ import org.apache.plc4x.java.api.authen
  import org.apache.plc4x.java.api.PlcConnection;
  import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
  import org.apache.plc4x.java.mock.connection.MockConnection;
++import org.apache.plc4x.java.mock.field.MockField;
  
  import java.util.Map;
  import java.util.concurrent.ConcurrentHashMap;
@@@ -60,4 -60,4 +61,9 @@@ public class MockDriver implements PlcD
          return connectionMap.computeIfAbsent(deviceName, name -> new MockConnection(authentication));
      }
  
++    @Override
++    public MockField prepareField(String query){
++        return new MockField(query);
++    }
++
  }
diff --cc plc4j/integrations/apache-camel/pom.xml
index 0368d98,cfa551a..fced3e1
--- a/plc4j/integrations/apache-camel/pom.xml
+++ b/plc4j/integrations/apache-camel/pom.xml
@@@ -128,18 -128,6 +128,24 @@@
        <version>3.1.0</version>
        <scope>test</scope>
      </dependency>
 +    <dependency>
 +      <groupId>org.apache.plc4x</groupId>
 +      <artifactId>plc4j-connection-pool</artifactId>
 +      <version>0.7.0-SNAPSHOT</version>
 +      <scope>compile</scope>
 +    </dependency>
 +    <dependency>
 +      <groupId>org.apache.plc4x</groupId>
 +      <artifactId>plc4j-scraper</artifactId>
 +      <version>0.7.0-SNAPSHOT</version>
 +      <scope>compile</scope>
 +    </dependency>
++    <dependency>
++      <groupId>org.apache.plc4x</groupId>
++      <artifactId>plc4j-scraper</artifactId>
++      <version>0.7.0-SNAPSHOT</version>
++      <scope>compile</scope>
++    </dependency>
    </dependencies>
  
    <dependencyManagement>
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Constants.java
index 733e269,733e269..e5bfd12
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Constants.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Constants.java
@@@ -22,7 -22,7 +22,8 @@@ public class Constants 
  
      public static final String FIELD_NAME_HEADER = "fieldName";
      public static final String FIELD_QUERY_HEADER = "fieldQuery";
--
++    public final static String TRIGGER = "TRIGGER_VAR";
++    public final static String PLC_NAME = "PLC";
      private Constants() {
        throw new IllegalStateException("Utility class!");
      }
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
index f7e705f,82763ff..b629bd2
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
@@@ -21,11 -21,11 +21,10 @@@ package org.apache.plc4x.camel
  import org.apache.camel.Endpoint;
  import org.apache.camel.support.DefaultComponent;
  import org.apache.camel.util.IntrospectionSupport;
++
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
--import java.util.ArrayList;
--import java.util.List;
  import java.util.Map;
  
  public class Plc4XComponent extends DefaultComponent {
@@@ -34,18 -34,10 +33,21 @@@
      @Override
      protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
          Endpoint endpoint = new Plc4XEndpoint(uri, this);
--        List<TagData>tags = getAndRemoveOrResolveReferenceParameter(parameters,"tags", List.class);
++        //Tags have a Name, a query and an optional value (for writing)
++        //Reading --> Map<String,String>
++        //Writing --> Map<String,Map.Entry<String,Object>>
++        Map<String, Object> tags = getAndRemoveOrResolveReferenceParameter(parameters, "tags", Map.class);
          if(tags!=null){
              ((Plc4XEndpoint)endpoint).setTags(tags);
          }
 +        String trigger = getAndRemoveOrResolveReferenceParameter(parameters,"trigger",String.class);
 +        if(trigger!=null){
 +            ((Plc4XEndpoint)endpoint).setTrigger(trigger);
 +        }
-         int period = getAndRemoveOrResolveReferenceParameter(parameters,"period",Integer.class);
-         if(period!=0){
-             ((Plc4XEndpoint)endpoint).setPeriod(period);
++        Object period = getAndRemoveOrResolveReferenceParameter(parameters,"period",Integer.class);
++        if(period!=null && period instanceof Integer){
++            ((Plc4XEndpoint)endpoint).setPeriod((int)period);
 +        }
          setProperties(endpoint,parameters);
          return endpoint;
      }
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 132aed8,18161db..e069e2e
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@@ -25,17 -25,8 +25,18 @@@ import org.apache.camel.support.Default
  import org.apache.camel.spi.ExceptionHandler;
  import org.apache.plc4x.java.api.PlcConnection;
  import org.apache.plc4x.java.api.exceptions.PlcException;
++import org.apache.plc4x.java.api.exceptions.PlcIncompatibleDatatypeException;
  import org.apache.plc4x.java.api.messages.PlcReadRequest;
  import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 +import org.apache.plc4x.java.scraper.ScrapeJob;
 +import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
 +import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
 +import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
 +import org.apache.plc4x.java.scraper.exception.ScraperException;
 +import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
 +import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
 +import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
 +import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -47,26 -41,19 +48,23 @@@ public class Plc4XConsumer extends Defa
  
      private ExceptionHandler exceptionHandler;
      private PlcConnection plcConnection;
--    private  List<TagData> tags;
-     private  Map<String,String> fields;
 -    private Map parameters;
++    private  Map<String,Object> tags;
 +    private String trigger;
      private PlcSubscriptionResponse subscriptionResponse;
      private Plc4XEndpoint plc4XEndpoint;
  
      private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
      private ScheduledFuture<?> future;
  
-     private final static String TRIGGER = "TRIGGER_VAR";
-     private final static String PLC_NAME = "PLC";
++
 +
      public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
          super(endpoint, processor);
          plc4XEndpoint =endpoint;
          this.plcConnection = endpoint.getConnection();
          this.tags = endpoint.getTags();
-         this.fields = TagData.toMap(this.tags);
 +        this.trigger= endpoint.getTrigger();
 +        plc4XEndpoint=endpoint;
      }
  
      @Override
@@@ -89,79 -76,41 +87,75 @@@
  
      @Override
      protected void doStart() throws InterruptedException, ExecutionException {
 -        PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
 -        if (tags.size()==1){
 -            TagData tag = tags.get(0);
 -            builder.addItem(tag.getTagName(),tag.getQuery());
 -
 -        }
 -        else{
 -           for(TagData tag : tags){
 -               builder.addItem(tag.getTagName(),tag.getQuery());
 -           }
 -        }
 -        PlcReadRequest request = builder.build();
 -        future = executorService.schedule(() -> {
 -            request.execute().thenAccept(response -> {
 +        if(trigger==null) {
 +            PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
-             if (tags.size() == 1) {
-                 TagData tag = tags.get(0);
-                 builder.addItem(tag.getTagName(), tag.getQuery());
- 
-             } else {
-                 for (TagData tag : tags) {
-                     builder.addItem(tag.getTagName(), tag.getQuery());
++            for( String tag : tags.keySet()){
++                try{
++                    String query = (String)tags.get(tag);
++                    builder.addItem(tag,query);
++                }
++                catch (PlcIncompatibleDatatypeException e){
++                    LOGGER.error("For consumer, please use Map<String,String>, currently using {}",tags.getClass().getSimpleName());
 +                }
 +            }
 +            PlcReadRequest request = builder.build();
 +            future = executorService.schedule(() -> {
 +                request.execute().thenAccept(response -> {
                      try {
                          Exchange exchange = plc4XEndpoint.createExchange();
-                         if (tags.size() > 1) {
 -                        if (tags.size()>1){
--                            List<TagData> values = new ArrayList<>();
-                             for (TagData tag : tags) {
 -                            for(TagData tag : tags){
--                                tag.setValue(response.getObject(tag.getTagName()));
--                                values.add(tag);
--                            }
--                            exchange.getIn().setBody(values);
-                         } else {
 -                        }
 -                        else {
--                            TagData tag = tags.get(0);
--                            tag.setValue(response.getAllObjects(tag.getTagName()));
--                            exchange.getIn().setBody(tag);
++                        Map<String,Object> rsp = new HashMap<>();
++                        for(String field : response.getFieldNames()){
++                            rsp.put(field,response.getObject(field));
                          }
++                        exchange.getIn().setBody(rsp);
                          getProcessor().process(exchange);
                      } catch (Exception e) {
                          exceptionHandler.handleException(e);
                      }
                  });
 -        }, 500, TimeUnit.MILLISECONDS);
 +            }, 500, TimeUnit.MILLISECONDS);
 +        }
 +        else{
- 
-             ScraperConfiguration configuration =  getScraperConfig(TagData.toMap(plc4XEndpoint.getTags()));
-             TriggerCollector collector = new TriggerCollectorImpl(plc4XEndpoint.getPlcDriverManager());
 +            try {
++                ScraperConfiguration configuration =  getScraperConfig(validateTags());
++                TriggerCollector collector = new TriggerCollectorImpl(plc4XEndpoint.getPlcDriverManager());
 +                TriggeredScraperImpl scraper = new TriggeredScraperImpl(configuration, (job, alias, response) -> {
++                    LOGGER.info("SCRAPER {} {} {}",job,alias,response);
 +                    try {
 +                        Exchange exchange = plc4XEndpoint.createExchange();
-                         if (tags.size() > 1) {
-                             List<TagData> values = new ArrayList<>();
-                             for (TagData tag : tags) {
-                                 tag.setValue(response.get(tag.getTagName()));
-                                 values.add(tag);
-                             }
-                             exchange.getIn().setBody(values);
-                         } else {
-                             TagData tag = tags.get(0);
-                             tag.setValue(response.get(tag.getTagName()));
-                             exchange.getIn().setBody(tag);
-                         }
++                        exchange.getIn().setBody(response);
 +                        getProcessor().process(exchange);
 +                    } catch (Exception e) {
 +                        exceptionHandler.handleException(e);
 +                    };
 +                    },collector);
 +                scraper.start();
 +                collector.start();
 +            } catch (ScraperException e) {
 +                e.printStackTrace();
 +            }
 +        }
 +    }
 +
++    private Map<String, String> validateTags() {
++        Map<String, String> map = new HashMap<>();
++        for(Map.Entry<String,Object>tag: tags.entrySet()){
++            if(tag.getValue() instanceof String){
++                map.put(tag.getKey(),(String)tag.getValue());
++            }
++        }
++        if(map.size()!=tags.size()){
++            LOGGER.error("At least one entry does not match the format : Map.Entry<String,String> ");
++            return null;
++        }
++        else return map;
++    }
++
 +    private ScraperConfigurationTriggeredImpl getScraperConfig(Map<String,String> tagList){
 +        String config = "(TRIGGER_VAR,"+plc4XEndpoint.getPeriod()+",("+ plc4XEndpoint.getTrigger() +")==(true))";
-         List<JobConfigurationImpl> job = Collections.singletonList(new JobConfigurationImpl("PLC4X-Camel",config,0,Collections.singletonList(PLC_NAME),tagList));
-         Map<String,String> source = Collections.singletonMap(PLC_NAME,plc4XEndpoint.getUri());
++        List<JobConfigurationImpl> job = Collections.singletonList(new JobConfigurationImpl("PLC4X-Camel",config,0,Collections.singletonList(Constants.PLC_NAME),tagList));
++        Map<String,String> source = Collections.singletonMap(Constants.PLC_NAME,plc4XEndpoint.getUri());
 +        return new ScraperConfigurationTriggeredImpl(source,job);
      }
  
      @Override
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
index 5e493ec,c7e6a0e..e14e9f5
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
@@@ -24,6 -24,6 +24,7 @@@ import org.apache.camel.spi.Metadata
  import org.apache.camel.spi.UriEndpoint;
  import org.apache.camel.spi.UriParam;
  import org.apache.camel.spi.UriPath;
++import org.apache.commons.math3.util.Pair;
  import org.apache.plc4x.java.PlcDriverManager;
  import org.apache.plc4x.java.api.PlcConnection;
  import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@@ -36,67 -36,30 +37,64 @@@ import java.util.Objects
  @UriEndpoint(scheme = "plc4x", title = "PLC4X", syntax = "plc4x:driver", label = "plc4x")
  public class Plc4XEndpoint extends DefaultEndpoint {
  
--    @UriPath @Metadata(required = true)
++    @UriPath
++    @Metadata(required = true)
      private String driver;
  
      @UriParam
--    private List<TagData> tags;
++    private Map<String, Object> tags;
  
 +    @UriParam
 +    private String trigger;
 +
 +    @UriParam
 +    private int period;
 +
 +    public int getPeriod() {
 +        return period;
 +    }
 +
 +    public void setPeriod(int period) {
 +        this.period = period;
 +    }
  
--    private final PlcDriverManager plcDriverManager;
--    private  PlcConnection connection;
++    private  PlcDriverManager plcDriverManager;
++    private PlcConnection connection;
      private String uri;
  
 +    public String getUri() {
 +        return uri;
 +    }
 +
 +    public String getTrigger() {
 +        return trigger;
 +    }
 +
 +    public void setTrigger(String trigger) {
 +        this.trigger = trigger;
++        plcDriverManager = new PooledPlcDriverManager();
++        String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
++        uri=plc4xURI;
++        try {
++            connection = plcDriverManager.getConnection(plc4xURI);
++        } catch (PlcConnectionException e) {
++            e.printStackTrace();
++        }
 +    }
 +
      public Plc4XEndpoint(String endpointUri, Component component) {
          super(endpointUri, component);
-         if(trigger==null) {
-             plcDriverManager = new PlcDriverManager();
-             uri = endpointUri;
-             //Here we establish the connection in the endpoint, as it is created once during the context
-             // to avoid disconnecting and reconnecting for every request
-             try {
-                 String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
-                 uri=plc4xURI;
-                 connection = plcDriverManager.getConnection(plc4xURI);
- 
-             } catch (PlcConnectionException e) {
-                 e.printStackTrace();
-             }
-         }
-         else {
-             plcDriverManager = new PooledPlcDriverManager();
 -        plcDriverManager= new PlcDriverManager();
++        plcDriverManager = new PlcDriverManager();
+         uri = endpointUri;
 -
+         //Here we establish the connection in the endpoint, as it is created once during the context
+         // to avoid disconnecting and reconnecting for every request
+         try {
              String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
-             uri=plc4xURI;
-             try {
-                 connection = plcDriverManager.getConnection(plc4xURI);
-             } catch (PlcConnectionException e) {
-                 e.printStackTrace();
-             }
++            uri = plc4xURI;
+             connection = plcDriverManager.getConnection(plc4xURI);
+ 
+         } catch (PlcConnectionException e) {
+             e.printStackTrace();
          }
      }
  
@@@ -112,11 -75,11 +110,10 @@@
      @Override
      public Producer createProducer() throws Exception {
          //Checking if connection is still up and reconnecting if not
--        if(!connection.isConnected()){
--            try{
--                connection= plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
--            }
--            catch (Exception e){
++        if (!connection.isConnected()) {
++            try {
++                connection = plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
++            } catch (Exception e) {
                  e.printStackTrace();
              }
          }
@@@ -126,11 -89,11 +123,10 @@@
      @Override
      public Consumer createConsumer(Processor processor) throws Exception {
          //Checking if connection is still up and reconnecting if not
--        if(!connection.isConnected()){
--            try{
--                connection= plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
--            }
--            catch (Exception e){
++        if (!connection.isConnected()) {
++            try {
++                connection = plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
++            } catch (Exception e) {
                  e.printStackTrace();
              }
          }
@@@ -140,11 -103,11 +136,10 @@@
      @Override
      public PollingConsumer createPollingConsumer() throws Exception {
          //Checking if connection is still up and reconnecting if not
--        if(!connection.isConnected()){
--            try{
--                connection= plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
--            }
--            catch (Exception e){
++        if (!connection.isConnected()) {
++            try {
++                connection = plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
++            } catch (Exception e) {
                  e.printStackTrace();
              }
          }
@@@ -168,11 -131,11 +163,11 @@@
          this.driver = driver;
      }
  
--    public List<TagData> getTags() {
++    public Map<String, Object> getTags() {
          return tags;
      }
  
--    public void setTags(List<TagData> tags) {
++    public void setTags(Map<String, Object> tags) {
          this.tags = tags;
      }
  
@@@ -195,20 -158,20 +190,19 @@@
  
      @Override
      public int hashCode() {
--        return Objects.hash(super.hashCode(), getDriver(), getTags(),getPlcDriverManager());
++        return Objects.hash(super.hashCode(), getDriver(), getTags(), getPlcDriverManager());
      }
  
      @Override
--    public void doStop(){
++    public void doStop() {
          //Shutting down the connection when leaving the Context
--        try{
--            if(connection!=null){
--                if(connection.isConnected()){
++        try {
++            if (connection != null) {
++                if (connection.isConnected()) {
                      connection.close();
                  }
              }
--        }
--        catch (Exception e){
++        } catch (Exception e) {
              e.printStackTrace();
          }
      }
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
index a261606,a261606..c6c369e
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
@@@ -26,8 -26,8 +26,14 @@@ import org.apache.camel.spi.ExceptionHa
  import org.apache.camel.support.LoggingExceptionHandler;
  import org.apache.plc4x.java.api.PlcConnection;
  import org.apache.plc4x.java.api.exceptions.PlcException;
++import org.apache.plc4x.java.api.exceptions.PlcIncompatibleDatatypeException;
  import org.apache.plc4x.java.api.messages.PlcReadRequest;
  import org.apache.plc4x.java.api.messages.PlcReadResponse;
++import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
++import org.apache.plc4x.java.scraper.exception.ScraperException;
++import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
++import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
++import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -44,14 -44,14 +50,17 @@@ public class Plc4XPollingConsumer imple
      private ExceptionHandler exceptionHandler;
      private PlcConnection plcConnection;
      private PlcReadRequest.Builder requestBuilder;
--    private Map parameters;
--
++    private  Map<String,Object> tags;
++    private String trigger;
  
++//TODO Is this still needed with the scraper working?
      public Plc4XPollingConsumer(Plc4XEndpoint endpoint) throws PlcException {
          plc4XEndpoint=endpoint;
          this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
          String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
          this.plcConnection = endpoint.getConnection();
++        this.tags = endpoint.getTags();
++        this.trigger= endpoint.getTrigger();
      }
  
      @Override
@@@ -73,7 -73,7 +82,7 @@@
      }
  
      @Override
--    public Exchange receive() {
++    public Exchange receive() {/**
          Exchange exchange = plc4XEndpoint.createExchange();
          try {
              PlcReadResponse read = createReadRequest().execute().get();
@@@ -96,7 -96,7 +105,8 @@@
          } catch (ExecutionException e) {
              exchange.setException(e);
          }
--        return exchange;
++        return exchange;*/
++    return null;
      }
  
      @Override
@@@ -105,7 -105,7 +115,7 @@@
      }
  
      @Override
--    public Exchange receive(long timeout) {
++    public Exchange receive(long timeout) {/**
          Exchange exchange = plc4XEndpoint.createExchange();
          CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
          try {
@@@ -129,11 -129,11 +139,12 @@@
              } catch(ExecutionException | TimeoutException e){
                  exchange.setException(e);
              }
--        return exchange;
++        return exchange;*/
++    return null;
      }
  
  
--    private PlcReadRequest createReadRequest() {
++    private PlcReadRequest createReadRequest() {/**
          requestBuilder = plcConnection.readRequestBuilder();
          if (plc4XEndpoint.getTags().size()>1){
              for(TagData tag : plc4XEndpoint.getTags()){
@@@ -145,7 -145,7 +156,7 @@@
              requestBuilder.addItem(tag.getTagName(),tag.getQuery());
          }
          return requestBuilder.build();
--    }
++    */return null;}
  
      private Object unwrapIfSingle(Collection collection) {
          if (collection.isEmpty()) {
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
index 2dd64ba,2dd64ba..151d83b
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
@@@ -22,6 -22,6 +22,7 @@@ import org.apache.camel.AsyncCallback
  import org.apache.camel.Exchange;
  import org.apache.camel.Message;
  import org.apache.camel.support.DefaultAsyncProducer;
++import org.apache.commons.math3.util.Pair;
  import org.apache.plc4x.java.api.PlcConnection;
  import org.apache.plc4x.java.api.exceptions.PlcException;
  import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
@@@ -30,7 -30,7 +31,7 @@@ import org.apache.plc4x.java.api.messag
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
--import java.util.List;
++import java.util.Map;
  import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.atomic.AtomicInteger;
  
@@@ -54,20 -54,20 +55,19 @@@ public class Plc4XProducer extends Defa
          Message in = exchange.getIn();
          Object body = in.getBody();
          PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder();
--        if (body instanceof List) { //Check if we have a List
--            if(((List) body).get(0) instanceof TagData){    //Check if this List contains TagData
--                List<TagData> tags =(List<TagData>) body;
--                for(TagData tag : tags){
--                    builder.addItem(tag.getTagName(),tag.getQuery(),tag.getValue());
--                }
++        if (body instanceof Map) { //Check if we have a Map
++            Map<String, Map.Entry<String, Object>> tags = (Map<String, Map.Entry<String, Object>>) body;
++            for (Map.Entry<String, Map.Entry<String, Object>> entry : tags.entrySet()) {
++                //Tags are stored like this --> Map<Tagname,Entry<Query,Value>> for writing
++                String name = entry.getKey();
++                String query = entry.getValue().getKey();
++                Object value = entry.getValue().getValue();
++                builder.addItem(name,query,value);
              }
--            else {
--                throw new PlcInvalidFieldException("Parameter 'tags' has to be a List of TagData");
--            }
--        }
--        else {
--            throw new PlcInvalidFieldException("Parameter 'tags' has to be a List");
++        } else {
++            throw new PlcInvalidFieldException("Parameter 'tags' has to be a List of TagData");
          }
++
          CompletableFuture<? extends PlcWriteResponse> completableFuture = builder.build().execute();
          int currentlyOpenRequests = openRequests.incrementAndGet();
          try {
diff --cc plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
index 168e2e5,168e2e5..7d8f78d
--- a/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
+++ b/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
@@@ -25,10 -25,10 +25,7 @@@ import org.apache.camel.test.junit5.Cam
  import org.apache.plc4x.java.api.model.PlcField;
  import org.junit.jupiter.api.Test;
  
--import java.util.ArrayList;
--import java.util.Arrays;
--import java.util.Collections;
--import java.util.List;
++import java.util.*;
  import java.util.concurrent.TimeUnit;
  
  public class Plc4XComponentTest extends CamelTestSupport {
@@@ -49,9 -49,9 +46,8 @@@
      protected RouteBuilder createRouteBuilder() {
          return new RouteBuilder() {
              public void configure() {
--                List<TagData> tags = new ArrayList<>();
--                tags.add(new TagData("testTagName","testTagAddress"));
--                tags.add(new TagData("testTagName2","testTagAddress2"));
++               Map<String,Object> tags = new HashMap<>();
++               tags.put("Test1","%TestQuery");
                  Plc4XEndpoint producer = getContext().getEndpoint("plc4x:mock:10.10.10.1/1/1", Plc4XEndpoint.class);
                  producer.setTags(tags);
                  from("direct:plc4x")
diff --cc plc4j/karaf-features/camel/pom.xml
index bbedb00,bbedb00..8c17998
--- a/plc4j/karaf-features/camel/pom.xml
+++ b/plc4j/karaf-features/camel/pom.xml
@@@ -29,7 -29,7 +29,7 @@@
  
    <modelVersion>4.0.0</modelVersion>
  
--  <artifactId>camel-feature</artifactId>
++  <artifactId>camel-plc4x</artifactId>
    <name>PLC4J: Karaf-Features: Camel</name>
    <packaging>pom</packaging>
  
diff --cc plc4j/tools/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
index a35dd12,a35dd12..691fd7f
--- a/plc4j/tools/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
+++ b/plc4j/tools/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
@@@ -32,6 -32,6 +32,7 @@@ import org.apache.plc4x.java.scraper.tr
  import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
  import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
  import org.junit.jupiter.api.BeforeEach;
++import org.junit.jupiter.api.Test;
  import org.mockito.Mockito;
  import org.mockito.invocation.InvocationOnMock;
  import org.mockito.stubbing.Answer;
@@@ -73,7 -73,7 +74,7 @@@ public class TriggeredScraperImplTest 
      /**
       * Test is added because we assume some strange behavior.
       */
--    //@Test
++    @Test
      public void scrapeMultipleTargets() throws ScraperException, IOException, InterruptedException {
          // Prepare the Mocking
          // Scrate Jobs 1 and 2
diff --cc plc4j/tools/scraper/src/test/resources/example_triggered_scraper.yml
index ec79e17,ec79e17..30ab0af
--- a/plc4j/tools/scraper/src/test/resources/example_triggered_scraper.yml
+++ b/plc4j/tools/scraper/src/test/resources/example_triggered_scraper.yml
@@@ -29,14 -29,14 +29,14 @@@ jobs
        test1: '%DB810:DBB0:USINT'
  
    - name: triggered-demo-job1
--    triggerConfig: (S7_TRIGGER_VAR,10,(%M0.3:BOOL)==(true))
++    triggerConfig: (TRIGGER_VAR,10,(%M0.3:BOOL)==(true))
      sources:
        - S7_PI
      fields:
        test1: '%DB810:DBW0:INT'
  
    - name: triggered-demo-job2
--    triggerConfig: (S7_TRIGGER_VAR,1000,(%M0.7:BOOL)==(true))
++    triggerConfig: (TRIGGER_VAR,1000,(%M0.7:BOOL)==(true))
      sources:
        - S7_PI
      fields:
@@@ -60,14 -60,14 +60,14 @@@
  
  
    - name: triggered-demo-job3-prev_value
--    triggerConfig: (S7_TRIGGER_VAR,500,(%M0:USINT)>=(PREV))
++    triggerConfig: (TRIGGER_VAR,500,(%M0:USINT)>=(PREV))
      sources:
        - S7_PI
      fields:
        test1: '%DB810:DBW0:INT'
  
    - name: triggered-demo-job4-combinded-condition
--    triggerConfig: (S7_TRIGGER_VAR,5,(%M0.1:BOOL)==(true)OR(%M0.2:BOOL)==(true))
++    triggerConfig: (TRIGGER_VAR,5,(%M0.1:BOOL)==(true)OR(%M0.2:BOOL)==(true))
      sources:
        - S7_PI
      fields:
diff --cc plc4j/tools/scraper/src/test/resources/mock-scraper-config.yml
index face235,face235..af9163a
--- a/plc4j/tools/scraper/src/test/resources/mock-scraper-config.yml
+++ b/plc4j/tools/scraper/src/test/resources/mock-scraper-config.yml
@@@ -40,7 -40,7 +40,7 @@@ jobs
  
  
    - name: triggered-demo-job1
--    triggerConfig: (S7_TRIGGER_VAR,10,(%M0.3:BOOL)==(true))
++    triggerConfig: (TRIGGER_VAR,10,(%M0.3:BOOL)==(true))
      sources:
        - MOCK_1
        - MOCK_2
@@@ -49,7 -49,7 +49,7 @@@
  
  
    - name: triggered-demo-job2
--    triggerConfig: (S7_TRIGGER_VAR,10,(%M0.7:BOOL)==(true))
++    triggerConfig: (TRIGGER_VAR,10,(%M0.7:BOOL)==(true))
      sources:
        - MOCK_1
        - MOCK_2