You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by er...@apache.org on 2020/05/15 10:40:20 UTC
[plc4x] 05/08: Camel using Scraper -trigger and period parameters
to use Triggered Scraper
This is an automated email from the ASF dual-hosted git repository.
erobinet pushed a commit to branch feature/scraper
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 8a1db07fa1a845a52339ed37676f4a9737712438
Merge: e5d3cdd 2dbaa94
Author: Etienne Robinet <et...@gmail.com>
AuthorDate: Tue May 12 14:03:37 2020 +0200
Camel using Scraper
-trigger and period parameters to use Triggered Scraper
.../org/apache/plc4x/java/mock/MockDriver.java | 6 ++
.../apache/plc4x/java/opcua/OpcuaPlcDriver.java | 1 -
.../src/main/resources/config.yaml | 10 ---
plc4j/integrations/apache-camel/pom.xml | 6 ++
.../java/org/apache/plc4x/camel/Constants.java | 3 +-
.../org/apache/plc4x/camel/Plc4XComponent.java | 14 ++--
.../java/org/apache/plc4x/camel/Plc4XConsumer.java | 74 ++++++++---------
.../java/org/apache/plc4x/camel/Plc4XEndpoint.java | 94 ++++++++++------------
.../apache/plc4x/camel/Plc4XPollingConsumer.java | 27 +++++--
.../java/org/apache/plc4x/camel/Plc4XProducer.java | 26 +++---
.../org/apache/plc4x/camel/Plc4XComponentTest.java | 10 +--
plc4j/karaf-features/camel/pom.xml | 2 +-
.../triggeredscraper/TriggeredScraperImplTest.java | 3 +-
.../test/resources/example_triggered_scraper.yml | 8 +-
.../src/test/resources/mock-scraper-config.yml | 4 +-
.../asciidoc/developers/release/validation.adoc | 5 ++
16 files changed, 149 insertions(+), 144 deletions(-)
diff --cc plc4j/drivers/mock/src/main/java/org/apache/plc4x/java/mock/MockDriver.java
index 3878fa0,3878fa0..e04a55d
--- a/plc4j/drivers/mock/src/main/java/org/apache/plc4x/java/mock/MockDriver.java
+++ b/plc4j/drivers/mock/src/main/java/org/apache/plc4x/java/mock/MockDriver.java
@@@ -23,6 -23,6 +23,7 @@@ import org.apache.plc4x.java.api.authen
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.mock.connection.MockConnection;
++import org.apache.plc4x.java.mock.field.MockField;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@@ -60,4 -60,4 +61,9 @@@ public class MockDriver implements PlcD
return connectionMap.computeIfAbsent(deviceName, name -> new MockConnection(authentication));
}
++ @Override
++ public MockField prepareField(String query){
++ return new MockField(query);
++ }
++
}
diff --cc plc4j/integrations/apache-camel/pom.xml
index 0368d98,cfa551a..fced3e1
--- a/plc4j/integrations/apache-camel/pom.xml
+++ b/plc4j/integrations/apache-camel/pom.xml
@@@ -128,18 -128,6 +128,24 @@@
<version>3.1.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-connection-pool</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-scraper</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.plc4x</groupId>
++ <artifactId>plc4j-scraper</artifactId>
++ <version>0.7.0-SNAPSHOT</version>
++ <scope>compile</scope>
++ </dependency>
</dependencies>
<dependencyManagement>
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Constants.java
index 733e269,733e269..e5bfd12
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Constants.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Constants.java
@@@ -22,7 -22,7 +22,8 @@@ public class Constants
public static final String FIELD_NAME_HEADER = "fieldName";
public static final String FIELD_QUERY_HEADER = "fieldQuery";
--
++ public final static String TRIGGER = "TRIGGER_VAR";
++ public final static String PLC_NAME = "PLC";
private Constants() {
throw new IllegalStateException("Utility class!");
}
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
index f7e705f,82763ff..b629bd2
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
@@@ -21,11 -21,11 +21,10 @@@ package org.apache.plc4x.camel
import org.apache.camel.Endpoint;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.util.IntrospectionSupport;
++
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
--import java.util.ArrayList;
--import java.util.List;
import java.util.Map;
public class Plc4XComponent extends DefaultComponent {
@@@ -34,18 -34,10 +33,21 @@@
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
Endpoint endpoint = new Plc4XEndpoint(uri, this);
-- List<TagData>tags = getAndRemoveOrResolveReferenceParameter(parameters,"tags", List.class);
++ //Tags have a Name, a query and an optional value (for writing)
++ //Reading --> Map<String,String>
++ //Writing --> Map<String,Map.Entry<String,Object>>
++ Map<String, Object> tags = getAndRemoveOrResolveReferenceParameter(parameters, "tags", Map.class);
if(tags!=null){
((Plc4XEndpoint)endpoint).setTags(tags);
}
+ String trigger = getAndRemoveOrResolveReferenceParameter(parameters,"trigger",String.class);
+ if(trigger!=null){
+ ((Plc4XEndpoint)endpoint).setTrigger(trigger);
+ }
- int period = getAndRemoveOrResolveReferenceParameter(parameters,"period",Integer.class);
- if(period!=0){
- ((Plc4XEndpoint)endpoint).setPeriod(period);
++ Object period = getAndRemoveOrResolveReferenceParameter(parameters,"period",Integer.class);
++ if(period!=null && period instanceof Integer){
++ ((Plc4XEndpoint)endpoint).setPeriod((int)period);
+ }
setProperties(endpoint,parameters);
return endpoint;
}
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 132aed8,18161db..e069e2e
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@@ -25,17 -25,8 +25,18 @@@ import org.apache.camel.support.Default
import org.apache.camel.spi.ExceptionHandler;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcException;
++import org.apache.plc4x.java.api.exceptions.PlcIncompatibleDatatypeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.scraper.ScrapeJob;
+import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
+import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -47,26 -41,19 +48,23 @@@ public class Plc4XConsumer extends Defa
private ExceptionHandler exceptionHandler;
private PlcConnection plcConnection;
-- private List<TagData> tags;
- private Map<String,String> fields;
- private Map parameters;
++ private Map<String,Object> tags;
+ private String trigger;
private PlcSubscriptionResponse subscriptionResponse;
private Plc4XEndpoint plc4XEndpoint;
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> future;
- private final static String TRIGGER = "TRIGGER_VAR";
- private final static String PLC_NAME = "PLC";
++
+
public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
super(endpoint, processor);
plc4XEndpoint =endpoint;
this.plcConnection = endpoint.getConnection();
this.tags = endpoint.getTags();
- this.fields = TagData.toMap(this.tags);
+ this.trigger= endpoint.getTrigger();
+ plc4XEndpoint=endpoint;
}
@Override
@@@ -89,79 -76,41 +87,75 @@@
@Override
protected void doStart() throws InterruptedException, ExecutionException {
- PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
- if (tags.size()==1){
- TagData tag = tags.get(0);
- builder.addItem(tag.getTagName(),tag.getQuery());
-
- }
- else{
- for(TagData tag : tags){
- builder.addItem(tag.getTagName(),tag.getQuery());
- }
- }
- PlcReadRequest request = builder.build();
- future = executorService.schedule(() -> {
- request.execute().thenAccept(response -> {
+ if(trigger==null) {
+ PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
- if (tags.size() == 1) {
- TagData tag = tags.get(0);
- builder.addItem(tag.getTagName(), tag.getQuery());
-
- } else {
- for (TagData tag : tags) {
- builder.addItem(tag.getTagName(), tag.getQuery());
++ for( String tag : tags.keySet()){
++ try{
++ String query = (String)tags.get(tag);
++ builder.addItem(tag,query);
++ }
++ catch (PlcIncompatibleDatatypeException e){
++ LOGGER.error("For consumer, please use Map<String,String>, currently using {}",tags.getClass().getSimpleName());
+ }
+ }
+ PlcReadRequest request = builder.build();
+ future = executorService.schedule(() -> {
+ request.execute().thenAccept(response -> {
try {
Exchange exchange = plc4XEndpoint.createExchange();
- if (tags.size() > 1) {
- if (tags.size()>1){
-- List<TagData> values = new ArrayList<>();
- for (TagData tag : tags) {
- for(TagData tag : tags){
-- tag.setValue(response.getObject(tag.getTagName()));
-- values.add(tag);
-- }
-- exchange.getIn().setBody(values);
- } else {
- }
- else {
-- TagData tag = tags.get(0);
-- tag.setValue(response.getAllObjects(tag.getTagName()));
-- exchange.getIn().setBody(tag);
++ Map<String,Object> rsp = new HashMap<>();
++ for(String field : response.getFieldNames()){
++ rsp.put(field,response.getObject(field));
}
++ exchange.getIn().setBody(rsp);
getProcessor().process(exchange);
} catch (Exception e) {
exceptionHandler.handleException(e);
}
});
- }, 500, TimeUnit.MILLISECONDS);
+ }, 500, TimeUnit.MILLISECONDS);
+ }
+ else{
-
- ScraperConfiguration configuration = getScraperConfig(TagData.toMap(plc4XEndpoint.getTags()));
- TriggerCollector collector = new TriggerCollectorImpl(plc4XEndpoint.getPlcDriverManager());
+ try {
++ ScraperConfiguration configuration = getScraperConfig(validateTags());
++ TriggerCollector collector = new TriggerCollectorImpl(plc4XEndpoint.getPlcDriverManager());
+ TriggeredScraperImpl scraper = new TriggeredScraperImpl(configuration, (job, alias, response) -> {
++ LOGGER.info("SCRAPER {} {} {}",job,alias,response);
+ try {
+ Exchange exchange = plc4XEndpoint.createExchange();
- if (tags.size() > 1) {
- List<TagData> values = new ArrayList<>();
- for (TagData tag : tags) {
- tag.setValue(response.get(tag.getTagName()));
- values.add(tag);
- }
- exchange.getIn().setBody(values);
- } else {
- TagData tag = tags.get(0);
- tag.setValue(response.get(tag.getTagName()));
- exchange.getIn().setBody(tag);
- }
++ exchange.getIn().setBody(response);
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exceptionHandler.handleException(e);
+ };
+ },collector);
+ scraper.start();
+ collector.start();
+ } catch (ScraperException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
++ private Map<String, String> validateTags() {
++ Map<String, String> map = new HashMap<>();
++ for(Map.Entry<String,Object>tag: tags.entrySet()){
++ if(tag.getValue() instanceof String){
++ map.put(tag.getKey(),(String)tag.getValue());
++ }
++ }
++ if(map.size()!=tags.size()){
++ LOGGER.error("At least one entry does not match the format : Map.Entry<String,String> ");
++ return null;
++ }
++ else return map;
++ }
++
+ private ScraperConfigurationTriggeredImpl getScraperConfig(Map<String,String> tagList){
+ String config = "(TRIGGER_VAR,"+plc4XEndpoint.getPeriod()+",("+ plc4XEndpoint.getTrigger() +")==(true))";
- List<JobConfigurationImpl> job = Collections.singletonList(new JobConfigurationImpl("PLC4X-Camel",config,0,Collections.singletonList(PLC_NAME),tagList));
- Map<String,String> source = Collections.singletonMap(PLC_NAME,plc4XEndpoint.getUri());
++ List<JobConfigurationImpl> job = Collections.singletonList(new JobConfigurationImpl("PLC4X-Camel",config,0,Collections.singletonList(Constants.PLC_NAME),tagList));
++ Map<String,String> source = Collections.singletonMap(Constants.PLC_NAME,plc4XEndpoint.getUri());
+ return new ScraperConfigurationTriggeredImpl(source,job);
}
@Override
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
index 5e493ec,c7e6a0e..e14e9f5
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
@@@ -24,6 -24,6 +24,7 @@@ import org.apache.camel.spi.Metadata
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
++import org.apache.commons.math3.util.Pair;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
@@@ -36,67 -36,30 +37,64 @@@ import java.util.Objects
@UriEndpoint(scheme = "plc4x", title = "PLC4X", syntax = "plc4x:driver", label = "plc4x")
public class Plc4XEndpoint extends DefaultEndpoint {
-- @UriPath @Metadata(required = true)
++ @UriPath
++ @Metadata(required = true)
private String driver;
@UriParam
-- private List<TagData> tags;
++ private Map<String, Object> tags;
+ @UriParam
+ private String trigger;
+
+ @UriParam
+ private int period;
+
+ public int getPeriod() {
+ return period;
+ }
+
+ public void setPeriod(int period) {
+ this.period = period;
+ }
-- private final PlcDriverManager plcDriverManager;
-- private PlcConnection connection;
++ private PlcDriverManager plcDriverManager;
++ private PlcConnection connection;
private String uri;
+ public String getUri() {
+ return uri;
+ }
+
+ public String getTrigger() {
+ return trigger;
+ }
+
+ public void setTrigger(String trigger) {
+ this.trigger = trigger;
++ plcDriverManager = new PooledPlcDriverManager();
++ String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
++ uri=plc4xURI;
++ try {
++ connection = plcDriverManager.getConnection(plc4xURI);
++ } catch (PlcConnectionException e) {
++ e.printStackTrace();
++ }
+ }
+
public Plc4XEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
- if(trigger==null) {
- plcDriverManager = new PlcDriverManager();
- uri = endpointUri;
- //Here we establish the connection in the endpoint, as it is created once during the context
- // to avoid disconnecting and reconnecting for every request
- try {
- String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
- uri=plc4xURI;
- connection = plcDriverManager.getConnection(plc4xURI);
-
- } catch (PlcConnectionException e) {
- e.printStackTrace();
- }
- }
- else {
- plcDriverManager = new PooledPlcDriverManager();
- plcDriverManager= new PlcDriverManager();
++ plcDriverManager = new PlcDriverManager();
+ uri = endpointUri;
-
+ //Here we establish the connection in the endpoint, as it is created once during the context
+ // to avoid disconnecting and reconnecting for every request
+ try {
String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
- uri=plc4xURI;
- try {
- connection = plcDriverManager.getConnection(plc4xURI);
- } catch (PlcConnectionException e) {
- e.printStackTrace();
- }
++ uri = plc4xURI;
+ connection = plcDriverManager.getConnection(plc4xURI);
+
+ } catch (PlcConnectionException e) {
+ e.printStackTrace();
}
}
@@@ -112,11 -75,11 +110,10 @@@
@Override
public Producer createProducer() throws Exception {
//Checking if connection is still up and reconnecting if not
-- if(!connection.isConnected()){
-- try{
-- connection= plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
-- }
-- catch (Exception e){
++ if (!connection.isConnected()) {
++ try {
++ connection = plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
++ } catch (Exception e) {
e.printStackTrace();
}
}
@@@ -126,11 -89,11 +123,10 @@@
@Override
public Consumer createConsumer(Processor processor) throws Exception {
//Checking if connection is still up and reconnecting if not
-- if(!connection.isConnected()){
-- try{
-- connection= plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
-- }
-- catch (Exception e){
++ if (!connection.isConnected()) {
++ try {
++ connection = plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
++ } catch (Exception e) {
e.printStackTrace();
}
}
@@@ -140,11 -103,11 +136,10 @@@
@Override
public PollingConsumer createPollingConsumer() throws Exception {
//Checking if connection is still up and reconnecting if not
-- if(!connection.isConnected()){
-- try{
-- connection= plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
-- }
-- catch (Exception e){
++ if (!connection.isConnected()) {
++ try {
++ connection = plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
++ } catch (Exception e) {
e.printStackTrace();
}
}
@@@ -168,11 -131,11 +163,11 @@@
this.driver = driver;
}
-- public List<TagData> getTags() {
++ public Map<String, Object> getTags() {
return tags;
}
-- public void setTags(List<TagData> tags) {
++ public void setTags(Map<String, Object> tags) {
this.tags = tags;
}
@@@ -195,20 -158,20 +190,19 @@@
@Override
public int hashCode() {
-- return Objects.hash(super.hashCode(), getDriver(), getTags(),getPlcDriverManager());
++ return Objects.hash(super.hashCode(), getDriver(), getTags(), getPlcDriverManager());
}
@Override
-- public void doStop(){
++ public void doStop() {
//Shutting down the connection when leaving the Context
-- try{
-- if(connection!=null){
-- if(connection.isConnected()){
++ try {
++ if (connection != null) {
++ if (connection.isConnected()) {
connection.close();
}
}
-- }
-- catch (Exception e){
++ } catch (Exception e) {
e.printStackTrace();
}
}
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
index a261606,a261606..c6c369e
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
@@@ -26,8 -26,8 +26,14 @@@ import org.apache.camel.spi.ExceptionHa
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcException;
++import org.apache.plc4x.java.api.exceptions.PlcIncompatibleDatatypeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
++import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
++import org.apache.plc4x.java.scraper.exception.ScraperException;
++import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
++import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
++import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -44,14 -44,14 +50,17 @@@ public class Plc4XPollingConsumer imple
private ExceptionHandler exceptionHandler;
private PlcConnection plcConnection;
private PlcReadRequest.Builder requestBuilder;
-- private Map parameters;
--
++ private Map<String,Object> tags;
++ private String trigger;
++//TODO Is this still needed with the scraper working?
public Plc4XPollingConsumer(Plc4XEndpoint endpoint) throws PlcException {
plc4XEndpoint=endpoint;
this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
this.plcConnection = endpoint.getConnection();
++ this.tags = endpoint.getTags();
++ this.trigger= endpoint.getTrigger();
}
@Override
@@@ -73,7 -73,7 +82,7 @@@
}
@Override
-- public Exchange receive() {
++ public Exchange receive() {/**
Exchange exchange = plc4XEndpoint.createExchange();
try {
PlcReadResponse read = createReadRequest().execute().get();
@@@ -96,7 -96,7 +105,8 @@@
} catch (ExecutionException e) {
exchange.setException(e);
}
-- return exchange;
++ return exchange;*/
++ return null;
}
@Override
@@@ -105,7 -105,7 +115,7 @@@
}
@Override
-- public Exchange receive(long timeout) {
++ public Exchange receive(long timeout) {/**
Exchange exchange = plc4XEndpoint.createExchange();
CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
try {
@@@ -129,11 -129,11 +139,12 @@@
} catch(ExecutionException | TimeoutException e){
exchange.setException(e);
}
-- return exchange;
++ return exchange;*/
++ return null;
}
-- private PlcReadRequest createReadRequest() {
++ private PlcReadRequest createReadRequest() {/**
requestBuilder = plcConnection.readRequestBuilder();
if (plc4XEndpoint.getTags().size()>1){
for(TagData tag : plc4XEndpoint.getTags()){
@@@ -145,7 -145,7 +156,7 @@@
requestBuilder.addItem(tag.getTagName(),tag.getQuery());
}
return requestBuilder.build();
-- }
++ */return null;}
private Object unwrapIfSingle(Collection collection) {
if (collection.isEmpty()) {
diff --cc plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
index 2dd64ba,2dd64ba..151d83b
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
@@@ -22,6 -22,6 +22,7 @@@ import org.apache.camel.AsyncCallback
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.support.DefaultAsyncProducer;
++import org.apache.commons.math3.util.Pair;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
@@@ -30,7 -30,7 +31,7 @@@ import org.apache.plc4x.java.api.messag
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
--import java.util.List;
++import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@@@ -54,20 -54,20 +55,19 @@@ public class Plc4XProducer extends Defa
Message in = exchange.getIn();
Object body = in.getBody();
PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder();
-- if (body instanceof List) { //Check if we have a List
-- if(((List) body).get(0) instanceof TagData){ //Check if this List contains TagData
-- List<TagData> tags =(List<TagData>) body;
-- for(TagData tag : tags){
-- builder.addItem(tag.getTagName(),tag.getQuery(),tag.getValue());
-- }
++ if (body instanceof Map) { //Check if we have a Map
++ Map<String, Map.Entry<String, Object>> tags = (Map<String, Map.Entry<String, Object>>) body;
++ for (Map.Entry<String, Map.Entry<String, Object>> entry : tags.entrySet()) {
++ //Tags are stored like this --> Map<Tagname,Entry<Query,Value>> for writing
++ String name = entry.getKey();
++ String query = entry.getValue().getKey();
++ Object value = entry.getValue().getValue();
++ builder.addItem(name,query,value);
}
-- else {
-- throw new PlcInvalidFieldException("Parameter 'tags' has to be a List of TagData");
-- }
-- }
-- else {
-- throw new PlcInvalidFieldException("Parameter 'tags' has to be a List");
++ } else {
++ throw new PlcInvalidFieldException("Parameter 'tags' has to be a List of TagData");
}
++
CompletableFuture<? extends PlcWriteResponse> completableFuture = builder.build().execute();
int currentlyOpenRequests = openRequests.incrementAndGet();
try {
diff --cc plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
index 168e2e5,168e2e5..7d8f78d
--- a/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
+++ b/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
@@@ -25,10 -25,10 +25,7 @@@ import org.apache.camel.test.junit5.Cam
import org.apache.plc4x.java.api.model.PlcField;
import org.junit.jupiter.api.Test;
--import java.util.ArrayList;
--import java.util.Arrays;
--import java.util.Collections;
--import java.util.List;
++import java.util.*;
import java.util.concurrent.TimeUnit;
public class Plc4XComponentTest extends CamelTestSupport {
@@@ -49,9 -49,9 +46,8 @@@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
-- List<TagData> tags = new ArrayList<>();
-- tags.add(new TagData("testTagName","testTagAddress"));
-- tags.add(new TagData("testTagName2","testTagAddress2"));
++ Map<String,Object> tags = new HashMap<>();
++ tags.put("Test1","%TestQuery");
Plc4XEndpoint producer = getContext().getEndpoint("plc4x:mock:10.10.10.1/1/1", Plc4XEndpoint.class);
producer.setTags(tags);
from("direct:plc4x")
diff --cc plc4j/karaf-features/camel/pom.xml
index bbedb00,bbedb00..8c17998
--- a/plc4j/karaf-features/camel/pom.xml
+++ b/plc4j/karaf-features/camel/pom.xml
@@@ -29,7 -29,7 +29,7 @@@
<modelVersion>4.0.0</modelVersion>
-- <artifactId>camel-feature</artifactId>
++ <artifactId>camel-plc4x</artifactId>
<name>PLC4J: Karaf-Features: Camel</name>
<packaging>pom</packaging>
diff --cc plc4j/tools/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
index a35dd12,a35dd12..691fd7f
--- a/plc4j/tools/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
+++ b/plc4j/tools/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
@@@ -32,6 -32,6 +32,7 @@@ import org.apache.plc4x.java.scraper.tr
import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
import org.junit.jupiter.api.BeforeEach;
++import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@@ -73,7 -73,7 +74,7 @@@ public class TriggeredScraperImplTest
/**
* Test is added because we assume some strange behavior.
*/
-- //@Test
++ @Test
public void scrapeMultipleTargets() throws ScraperException, IOException, InterruptedException {
// Prepare the Mocking
// Scrate Jobs 1 and 2
diff --cc plc4j/tools/scraper/src/test/resources/example_triggered_scraper.yml
index ec79e17,ec79e17..30ab0af
--- a/plc4j/tools/scraper/src/test/resources/example_triggered_scraper.yml
+++ b/plc4j/tools/scraper/src/test/resources/example_triggered_scraper.yml
@@@ -29,14 -29,14 +29,14 @@@ jobs
test1: '%DB810:DBB0:USINT'
- name: triggered-demo-job1
-- triggerConfig: (S7_TRIGGER_VAR,10,(%M0.3:BOOL)==(true))
++ triggerConfig: (TRIGGER_VAR,10,(%M0.3:BOOL)==(true))
sources:
- S7_PI
fields:
test1: '%DB810:DBW0:INT'
- name: triggered-demo-job2
-- triggerConfig: (S7_TRIGGER_VAR,1000,(%M0.7:BOOL)==(true))
++ triggerConfig: (TRIGGER_VAR,1000,(%M0.7:BOOL)==(true))
sources:
- S7_PI
fields:
@@@ -60,14 -60,14 +60,14 @@@
- name: triggered-demo-job3-prev_value
-- triggerConfig: (S7_TRIGGER_VAR,500,(%M0:USINT)>=(PREV))
++ triggerConfig: (TRIGGER_VAR,500,(%M0:USINT)>=(PREV))
sources:
- S7_PI
fields:
test1: '%DB810:DBW0:INT'
- name: triggered-demo-job4-combinded-condition
-- triggerConfig: (S7_TRIGGER_VAR,5,(%M0.1:BOOL)==(true)OR(%M0.2:BOOL)==(true))
++ triggerConfig: (TRIGGER_VAR,5,(%M0.1:BOOL)==(true)OR(%M0.2:BOOL)==(true))
sources:
- S7_PI
fields:
diff --cc plc4j/tools/scraper/src/test/resources/mock-scraper-config.yml
index face235,face235..af9163a
--- a/plc4j/tools/scraper/src/test/resources/mock-scraper-config.yml
+++ b/plc4j/tools/scraper/src/test/resources/mock-scraper-config.yml
@@@ -40,7 -40,7 +40,7 @@@ jobs
- name: triggered-demo-job1
-- triggerConfig: (S7_TRIGGER_VAR,10,(%M0.3:BOOL)==(true))
++ triggerConfig: (TRIGGER_VAR,10,(%M0.3:BOOL)==(true))
sources:
- MOCK_1
- MOCK_2
@@@ -49,7 -49,7 +49,7 @@@
- name: triggered-demo-job2
-- triggerConfig: (S7_TRIGGER_VAR,10,(%M0.7:BOOL)==(true))
++ triggerConfig: (TRIGGER_VAR,10,(%M0.7:BOOL)==(true))
sources:
- MOCK_1
- MOCK_2