You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by er...@apache.org on 2020/03/31 07:13:39 UTC

[plc4x] branch camel updated (4d4efc5 -> 00efaea)

This is an automated email from the ASF dual-hosted git repository.

erobinet pushed a change to branch camel
in repository https://gitbox.apache.org/repos/asf/plc4x.git.


    from 4d4efc5  - Updates and fixes from Etienne (Still a WIP ...)
     new 3d6bdc9  -Updating Camel Component
     new 00efaea  Camel Component update

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/plc4x/camel/Constants.java     |  2 +-
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java | 35 ++++++++-------
 .../java/org/apache/plc4x/camel/Plc4XEndpoint.java | 15 ++++---
 .../apache/plc4x/camel/Plc4XPollingConsumer.java   | 50 ++++++++++++----------
 .../java/org/apache/plc4x/camel/Plc4XProducer.java | 24 ++++++-----
 .../main/java/org/apache/plc4x/camel/TagData.java  | 43 ++++++++++++-------
 .../org/apache/plc4x/camel/Plc4XComponentTest.java | 20 +++++----
 .../org/apache/plc4x/camel/Plc4XConsumerTest.java  | 18 --------
 .../org/apache/plc4x/camel/Plc4XProducerTest.java  | 37 ++++++++--------
 pom.xml                                            | 13 +++++-
 10 files changed, 135 insertions(+), 122 deletions(-)
 copy sandbox/code-gen/src/main/java/org/apache/plc4x/codegen/ast/ConstantExpression.java => plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java (56%)


[plc4x] 02/02: Camel Component update

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

erobinet pushed a commit to branch camel
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 00efaea16580888548382ebfc4afb1341bb2e94c
Merge: 3d6bdc9 4d4efc5
Author: etiennerobinet <61...@users.noreply.github.com>
AuthorDate: Tue Mar 31 09:13:04 2020 +0200

    Camel Component update

 pom.xml | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --cc pom.xml
index 775a101,775a101..13809bb
--- a/pom.xml
+++ b/pom.xml
@@@ -47,14 -47,14 +47,23 @@@
      <tag>HEAD</tag>
    </scm>
  
--  <!-- Only configure the site distribution as the rest is handled by the apache parent -->
++  <!-- Only configure the site distribution as the rest is handled by the apache parent
    <distributionManagement>
      <site>
        <id>apache.website</id>
        <url>scm:git:https://gitbox.apache.org/repos/asf/plc4x-website.git</url>
      </site>
++  </distributionManagement-->
++  <distributionManagement>
++    <snapshotRepository>
++      <id>snapshots</id>
++      <url>http://lugsrv1.gis.goodyear.com:8081/repository/snapshots/</url>
++    </snapshotRepository>
++    <repository>
++      <id>releases</id>
++      <url>http://lugsrv1.gis.goodyear.com:8081/repository/releases/</url>
++    </repository>
    </distributionManagement>
--
    <issueManagement>
      <system>Jira</system>
      <url>https://issues.apache.org/jira/browse/PLC4X</url>


[plc4x] 01/02: -Updating Camel Component

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

erobinet pushed a commit to branch camel
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 3d6bdc9c180fde49a3ce45d130e1951fc9aa3755
Author: Etienne Robinet <et...@gmail.com>
AuthorDate: Thu Mar 26 16:03:04 2020 +0100

    -Updating Camel Component
---
 .../java/org/apache/plc4x/camel/Constants.java     |  2 +-
 .../org/apache/plc4x/camel/Plc4XComponent.java     |  4 +-
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java | 93 ++++++++++++----------
 .../java/org/apache/plc4x/camel/Plc4XEndpoint.java | 80 ++++++++++++++++---
 .../apache/plc4x/camel/Plc4XPollingConsumer.java   | 73 +++++++++++------
 .../java/org/apache/plc4x/camel/Plc4XProducer.java | 45 +++++------
 .../camel/{Plc4XComponent.java => TagData.java}    | 48 +++++++----
 .../services/org/apache/camel/component/plc4x      | 36 ++++-----
 .../org/apache/plc4x/camel/Plc4XComponentTest.java | 23 +++---
 .../org/apache/plc4x/camel/Plc4XConsumerTest.java  | 18 -----
 .../org/apache/plc4x/camel/Plc4XProducerTest.java  | 30 +++----
 11 files changed, 273 insertions(+), 179 deletions(-)

diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Constants.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Constants.java
index eefde57..733e269 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Constants.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Constants.java
@@ -22,7 +22,7 @@ public class Constants {
 
     public static final String FIELD_NAME_HEADER = "fieldName";
     public static final String FIELD_QUERY_HEADER = "fieldQuery";
-    
+
     private Constants() {
       throw new IllegalStateException("Utility class!");
     }
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
index 71a252d..020168d 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
@@ -20,10 +20,13 @@ package org.apache.plc4x.camel;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.support.DefaultComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 public class Plc4XComponent extends DefaultComponent {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XComponent.class);
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
@@ -37,5 +40,4 @@ public class Plc4XComponent extends DefaultComponent {
         Plc4XEndpoint plc4XEndpoint = (Plc4XEndpoint) endpoint;
         plc4XEndpoint.setDriver(remaining.split(":")[0]);
     }
-
 }
\ No newline at end of file
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index b9414cd..36bc5b1 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -1,5 +1,5 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
+Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
  distributed with this work for additional information
  regarding copyright ownership.  The ASF licenses this file
@@ -25,37 +25,35 @@ import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcException;
-import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.*;
+import java.util.concurrent.*;
 
-public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util.function.Consumer<PlcSubscriptionEvent> {
+public class Plc4XConsumer extends ServiceSupport implements Consumer {
     private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XConsumer.class);
 
     private Plc4XEndpoint endpoint;
     private AsyncProcessor processor;
     private ExceptionHandler exceptionHandler;
     private PlcConnection plcConnection;
-    private String fieldQuery;
+    private  List<TagData> tags;
     private Class<?> dataType;
     private PlcSubscriptionResponse subscriptionResponse;
 
+    private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+    private ScheduledFuture<?> future;
+
     public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
         this.endpoint = endpoint;
         this.dataType = endpoint.getDataType();
         this.processor = AsyncProcessorConverterHelper.convert(processor);
         this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
-        String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
-        this.plcConnection = endpoint.getPlcDriverManager().getConnection(plc4xURI);
-        this.fieldQuery = endpoint.getAddress();
+        this.plcConnection = endpoint.getConnection();
+        this.tags = endpoint.getTags();
     }
 
     @Override
@@ -78,37 +76,47 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
 
     @Override
     protected void doStart() throws InterruptedException, ExecutionException {
-        // TODO: Is it correct to only support one field?
-        PlcSubscriptionRequest request = plcConnection.subscriptionRequestBuilder()
-            .addCyclicField("default", fieldQuery, Duration.of(3, ChronoUnit.SECONDS)).build();
-        subscriptionResponse = request.execute().get();
-        // TODO: we need to return the plcSubscriptionResponse here too as we need this to unsubscribe...
-        // TODO: figure out what to do with this
-        // plcSubscriber.register(this, plcSubscriptionResponse.getSubscriptionHandles());
-    }
+        PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
+        if (tags.size()==1){
+            TagData tag = tags.get(0);
+            builder.addItem(tag.getTagName(),tag.getQuery());
 
-    @Override
-    protected void doStop() throws InterruptedException, ExecutionException, TimeoutException {
-        PlcUnsubscriptionRequest request = plcConnection.unsubscriptionRequestBuilder().addHandles(subscriptionResponse.getSubscriptionHandles()).build();
-        CompletableFuture<? extends PlcUnsubscriptionResponse> unsubscriptionFuture = request.execute();
-        /*PlcUnsubscriptionResponse unsubscriptionResponse =*/ unsubscriptionFuture.get(5, TimeUnit.SECONDS);
-        // TODO: Handle the response ...
-        try {
-            plcConnection.close();
-        } catch (Exception e) {
-            LOGGER.error("Error closing connection", e);
         }
+        else{
+           for(TagData tag : tags){
+               builder.addItem(tag.getTagName(),tag.getQuery());
+           }
+        }
+        PlcReadRequest request = builder.build();
+        future = executorService.schedule(() -> {
+            request.execute().thenAccept(response -> {
+                    try {
+                        Exchange exchange = endpoint.createExchange();
+                        if (tags.size()>1){
+                            List<TagData> values = new ArrayList<>();
+                            for(TagData tag : tags){
+                                values.add(tag);
+                            }
+                            exchange.getIn().setBody(values);
+                        }
+                        else {
+                            TagData tag = tags.get(0);
+                            tag.setValue(response.getAllObjects(tag.getTagName()));
+                            exchange.getIn().setBody(tag);
+                        }
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        exceptionHandler.handleException(e);
+                    }
+                });
+        }, 500, TimeUnit.MILLISECONDS);
     }
 
     @Override
-    public void accept(PlcSubscriptionEvent plcSubscriptionEvent) {
-        LOGGER.debug("Received {}", plcSubscriptionEvent);
-        try {
-            Exchange exchange = endpoint.createExchange();
-            exchange.getIn().setBody(unwrapIfSingle(plcSubscriptionEvent.getAllObjects("default")));
-            processor.process(exchange);
-        } catch (Exception e) {
-            exceptionHandler.handleException(e);
+    protected void doStop() throws InterruptedException, ExecutionException, TimeoutException {
+        // First stop the polling process
+        if (future != null) {
+            future.cancel(true);
         }
     }
 
@@ -124,7 +132,6 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
 
     @Override
     public Processor getProcessor() {
-        // TODO: No idea what to do here ...
-        return null;
+        return this.processor;
     }
-}
+}
\ No newline at end of file
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
index 1d2e4dc..27dc8a0 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
@@ -25,7 +25,11 @@ import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 @UriEndpoint(scheme = "plc4x", title = "PLC4X", syntax = "plc4x:driver", label = "plc4x")
@@ -45,7 +49,7 @@ public class Plc4XEndpoint extends DefaultEndpoint {
     @UriParam
     @Metadata(required = false)
     @SuppressWarnings("unused")
-    private String address;
+    private List<TagData> tags;
 
     /**
      * TODO: document me
@@ -53,27 +57,70 @@ public class Plc4XEndpoint extends DefaultEndpoint {
     @UriParam
     @Metadata(required = false)
     @SuppressWarnings("unused")
-    private Class dataType;
+    private Class dataType ;
 
     private final PlcDriverManager plcDriverManager;
+    private  PlcConnection connection;
+    private String uri;
 
     public Plc4XEndpoint(String endpointUri, Component component) {
         super(endpointUri, component);
-        plcDriverManager = new PlcDriverManager();
+        plcDriverManager= new PlcDriverManager();
+        uri = endpointUri;
+        //Here I established the connection in the endpoint, as it is created once during the context
+        // to avoid disconnecting and reconnecting for every request
+        try {
+            String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
+            connection = plcDriverManager.getConnection(plc4xURI);
+        } catch (PlcConnectionException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public PlcConnection getConnection() {
+        return connection;
     }
 
+
     @Override
     public Producer createProducer() throws Exception {
+        //Checking if connection is still up and reconnecting if not
+        if(!connection.isConnected()){
+            try{
+                connection= plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
+            }
+            catch (Exception e){
+                e.printStackTrace();
+            }
+        }
         return new Plc4XProducer(this);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
+        //Checking if connection is still up and reconnecting if not
+        if(!connection.isConnected()){
+            try{
+                connection= plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
+            }
+            catch (Exception e){
+                e.printStackTrace();
+            }
+        }
         return new Plc4XConsumer(this, processor);
     }
 
     @Override
     public PollingConsumer createPollingConsumer() throws Exception {
+        //Checking if connection is still up and reconnecting if not
+        if(!connection.isConnected()){
+            try{
+                connection= plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
+            }
+            catch (Exception e){
+                e.printStackTrace();
+            }
+        }
         return new Plc4XPollingConsumer(this);
     }
 
@@ -94,12 +141,12 @@ public class Plc4XEndpoint extends DefaultEndpoint {
         this.driver = driver;
     }
 
-    public String getAddress() {
-        return address;
+    public List<TagData> getTags() {
+        return tags;
     }
 
-    public void setAddress(String address) {
-        this.address = address;
+    public void setTags(List<TagData> tags) {
+        this.tags = tags;
     }
 
     public Class getDataType() {
@@ -123,14 +170,29 @@ public class Plc4XEndpoint extends DefaultEndpoint {
         }
         Plc4XEndpoint that = (Plc4XEndpoint) o;
         return Objects.equals(getDriver(), that.getDriver()) &&
-            Objects.equals(getAddress(), that.getAddress()) &&
+            Objects.equals(getTags(), that.getTags()) &&
             Objects.equals(getDataType(), that.getDataType()) &&
             Objects.equals(getPlcDriverManager(), that.getPlcDriverManager());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), getDriver(), getAddress(), getDataType(), getPlcDriverManager());
+        return Objects.hash(super.hashCode(), getDriver(), getTags(), getDataType(), getPlcDriverManager());
+    }
+
+    @Override
+    public void doStop(){
+        //Shutting down the connection when leaving the Context
+        try{
+            if(connection!=null){
+                if(connection.isConnected()){
+                    connection.close();
+                }
+            }
+        }
+        catch (Exception e){
+            e.printStackTrace();
+        }
     }
 
 }
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
index 84e54bb..719cc11 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java
@@ -32,7 +32,7 @@ import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -47,13 +47,14 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
     private PlcReadRequest.Builder requestBuilder;
     private Class dataType;
 
+    //private int request =0;
+
     public Plc4XPollingConsumer(Plc4XEndpoint endpoint) throws PlcException {
         this.endpoint = endpoint;
         this.dataType = endpoint.getDataType();
         this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
         String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
-        this.plcConnection = endpoint.getPlcDriverManager().getConnection(plc4xURI);
-        this.requestBuilder = plcConnection.readRequestBuilder();
+        this.plcConnection = endpoint.getConnection();
     }
 
     @Override
@@ -77,10 +78,21 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
     @Override
     public Exchange receive() {
         Exchange exchange = endpoint.createExchange();
-        CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
         try {
-            PlcReadResponse plcReadResponse = read.get();
-            exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default")));
+            PlcReadResponse read = createReadRequest().execute().get();
+            if(endpoint.getTags().size()==1) {
+                TagData tag = endpoint.getTags().get(0);
+                tag.setValue(read.getAllObjects(tag.getTagName()));
+                exchange.getIn().setBody(tag);
+            }
+            else{
+                List<TagData> values = new ArrayList<>();
+                for(TagData tag : endpoint.getTags()){
+                    tag.setValue(read.getObject(tag.getTagName()));
+                    values.add(tag);
+                }
+                exchange.getIn().setBody(values);
+            }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             exchange.setException(e);
@@ -101,32 +113,49 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
         CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
         try {
             PlcReadResponse plcReadResponse = read.get(timeout, TimeUnit.MILLISECONDS);
-            exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default")));
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            exchange.setException(e);
-        } catch (ExecutionException | TimeoutException e) {
-            exchange.setException(e);
-        }
+            if(endpoint.getTags().size()==1) {
+                TagData tag = endpoint.getTags().get(0);
+                tag.setValue(plcReadResponse.getAllObjects(tag.getTagName()));
+                exchange.getIn().setBody(tag);
+            }
+            else{
+                List<TagData> values = new ArrayList<>();
+                for(TagData tag : endpoint.getTags()){
+                    tag.setValue(plcReadResponse.getObject(tag.getTagName()));
+                    values.add(tag);
+                }
+                exchange.getIn().setBody(values);
+            }
+            } catch(InterruptedException e){
+                Thread.currentThread().interrupt();
+                exchange.setException(e);
+            } catch(ExecutionException | TimeoutException e){
+                exchange.setException(e);
+            }
         return exchange;
     }
 
     @Override
     protected void doStart() {
-        // We don't seem to need to do anything special here.
     }
 
     @Override
-    protected void doStop() {
-        try {
-            plcConnection.close();
-        } catch (Exception e) {
-            LOGGER.error("Error closing connection", e);
-        }
+    protected void doStop() throws Exception {
     }
 
+
     private PlcReadRequest createReadRequest() {
-        return requestBuilder.addItem("default", endpoint.getAddress()).build();
+        requestBuilder = plcConnection.readRequestBuilder();
+        if (endpoint.getTags().size()>1){
+            for(TagData tag : endpoint.getTags()){
+                requestBuilder.addItem(tag.getTagName(),tag.getQuery());
+            }
+        }
+        else{
+            TagData tag = endpoint.getTags().get(0);
+            requestBuilder.addItem(tag.getTagName(),tag.getQuery());
+        }
+        return requestBuilder.build();
     }
 
     private Object unwrapIfSingle(Collection collection) {
@@ -141,8 +170,6 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
 
     @Override
     public Processor getProcessor() {
-        // TODO: No idea what to do here ...
         return null;
     }
-
 }
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
index aa099f6..2dd64ba 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
@@ -24,6 +24,7 @@ import org.apache.camel.Message;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.slf4j.Logger;
@@ -34,16 +35,14 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class Plc4XProducer extends DefaultAsyncProducer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(Plc4XProducer.class);
-
+    private final Logger log = LoggerFactory.getLogger(Plc4XProducer.class);
     private PlcConnection plcConnection;
     private AtomicInteger openRequests;
 
     public Plc4XProducer(Plc4XEndpoint endpoint) throws PlcException {
         super(endpoint);
         String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
-        plcConnection = endpoint.getPlcDriverManager().getConnection(plc4xURI);
+        this.plcConnection = endpoint.getConnection();
         if (!plcConnection.getMetadata().canWrite()) {
             throw new PlcException("This connection (" + plc4xURI + ") doesn't support writing.");
         }
@@ -53,22 +52,26 @@ public class Plc4XProducer extends DefaultAsyncProducer {
     @Override
     public void process(Exchange exchange) throws Exception {
         Message in = exchange.getIn();
-        String fieldName = in.getHeader(Constants.FIELD_NAME_HEADER, String.class);
-        String fieldQuery = in.getHeader(Constants.FIELD_QUERY_HEADER, String.class);
         Object body = in.getBody();
-        if (body instanceof List) {
-            List<?> bodyList = in.getBody(List.class);
-            Object[] values = bodyList.toArray();
-//            builder.addItem(fieldName, fieldQuery, values);
-        } else {
-            Object value = in.getBody(Object.class);
-//            builder.addItem(fieldName, fieldQuery, value);
-        }
         PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder();
+        if (body instanceof List) { //Check if we have a List
+            if(((List) body).get(0) instanceof TagData){    //Check if this List contains TagData
+                List<TagData> tags =(List<TagData>) body;
+                for(TagData tag : tags){
+                    builder.addItem(tag.getTagName(),tag.getQuery(),tag.getValue());
+                }
+            }
+            else {
+                throw new PlcInvalidFieldException("Parameter 'tags' has to be a List of TagData");
+            }
+        }
+        else {
+            throw new PlcInvalidFieldException("Parameter 'tags' has to be a List");
+        }
         CompletableFuture<? extends PlcWriteResponse> completableFuture = builder.build().execute();
         int currentlyOpenRequests = openRequests.incrementAndGet();
         try {
-            LOG.debug("Currently open requests including {}:{}", exchange, currentlyOpenRequests);
+            log.debug("Currently open requests including {}:{}", exchange, currentlyOpenRequests);
             Object plcWriteResponse = completableFuture.get();
             if (exchange.getPattern().isOutCapable()) {
                 Message out = exchange.getOut();
@@ -79,7 +82,7 @@ public class Plc4XProducer extends DefaultAsyncProducer {
             }
         } finally {
             int openRequestsAfterFinish = openRequests.decrementAndGet();
-            LOG.trace("Open Requests after {}:{}", exchange, openRequestsAfterFinish);
+            log.trace("Open Requests after {}:{}", exchange, openRequestsAfterFinish);
         }
     }
 
@@ -100,16 +103,10 @@ public class Plc4XProducer extends DefaultAsyncProducer {
     @Override
     protected void doStop() throws Exception {
         int openRequestsAtStop = openRequests.get();
-        LOG.debug("Stopping with {} open requests", openRequestsAtStop);
+        log.debug("Stopping with {} open requests", openRequestsAtStop);
         if (openRequestsAtStop > 0) {
-            LOG.warn("There are still {} open requests", openRequestsAtStop);
-        }
-        try {
-            plcConnection.close();
-        } catch (Exception e) {
-            LOG.warn("Could not close {}", plcConnection, e);
+            log.warn("There are still {} open requests", openRequestsAtStop);
         }
-        super.doStop();
     }
 
 }
diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java
similarity index 50%
copy from plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
copy to plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java
index 71a252d..b2bc820 100644
--- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
+++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java
@@ -18,24 +18,44 @@ under the License.
 */
 package org.apache.plc4x.camel;
 
-import org.apache.camel.Endpoint;
-import org.apache.camel.support.DefaultComponent;
+public class TagData {
+    private String tagName;
+    private String query;
+    private Object value;
+
+    public TagData(String alias, String query, Object value) {
+        this.tagName = alias;
+        this.query = query;
+        this.value = value;
+    }
 
-import java.util.Map;
+    public TagData(String tagName, String query) {
+        this.tagName = tagName;
+        this.query = query;
+    }
 
-public class Plc4XComponent extends DefaultComponent {
+    public String getTagName() {
+        return tagName;
+    }
+
+    public void setTagName(String tagName) {
+        this.tagName = tagName;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    public void setQuery(String query) {
+        this.query = query;
+    }
 
-    @Override
-    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        Endpoint endpoint = new Plc4XEndpoint(uri, this);
-        setProperties(endpoint, parameters);
-        return endpoint;
+    public Object getValue() {
+        return value;
     }
 
-    @Override
-    protected void afterConfiguration(String uri, String remaining, Endpoint endpoint, Map<String, Object> parameters) {
-        Plc4XEndpoint plc4XEndpoint = (Plc4XEndpoint) endpoint;
-        plc4XEndpoint.setDriver(remaining.split(":")[0]);
+    public void setValue(Object value) {
+        this.value = value;
     }
 
-}
\ No newline at end of file
+}
diff --git a/plc4j/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x b/plc4j/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x
index 9f443db..7188880 100644
--- a/plc4j/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x
+++ b/plc4j/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x
@@ -1,19 +1,19 @@
 #
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-class=org.apache.plc4x.camel.Plc4XComponent
\ No newline at end of file
+                                           # Licensed to the Apache Software Foundation (ASF) under one
+                                           # or more contributor license agreements.  See the NOTICE file
+                                           # distributed with this work for additional information
+                                           # regarding copyright ownership.  The ASF licenses this file
+                                           # to you under the Apache License, Version 2.0 (the
+                                           # "License"); you may not use this file except in compliance
+                                           # with the License.  You may obtain a copy of the License at
+                                           #
+                                           #   http://www.apache.org/licenses/LICENSE-2.0
+                                           #
+                                           # Unless required by applicable law or agreed to in writing,
+                                           # software distributed under the License is distributed on an
+                                           # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+                                           # KIND, either express or implied.  See the License for the
+                                           # specific language governing permissions and limitations
+                                           # under the License.
+                                           #
+                                           class=org.apache.plc4x.camel.Plc4XComponent
\ No newline at end of file
diff --git a/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java b/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
index d7da472..168e2e5 100644
--- a/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
+++ b/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
@@ -18,13 +18,17 @@ under the License.
 */
 package org.apache.plc4x.camel;
 
+import org.apache.camel.Expression;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 public class Plc4XComponentTest extends CamelTestSupport {
@@ -35,8 +39,8 @@ public class Plc4XComponentTest extends CamelTestSupport {
         mock.expectedMinimumMessageCount(1);
         mock.expectedMessageCount(2);
 
-        template.asyncSendBody("direct:plc4x", "irrelevant");
-        template.asyncSendBody("direct:plc4x2", "irrelevant");
+        template.asyncSendBody("direct:plc4x", Collections.singletonList("irrelevant"));
+        template.asyncSendBody("direct:plc4x2", Collections.singletonList("irrelevant"));
 
         assertMockEndpointsSatisfied(2, TimeUnit.SECONDS);
     }
@@ -45,19 +49,20 @@ public class Plc4XComponentTest extends CamelTestSupport {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
+                List<TagData> tags = new ArrayList<>();
+                tags.add(new TagData("testTagName","testTagAddress"));
+                tags.add(new TagData("testTagName2","testTagAddress2"));
+                Plc4XEndpoint producer = getContext().getEndpoint("plc4x:mock:10.10.10.1/1/1", Plc4XEndpoint.class);
+                producer.setTags(tags);
                 from("direct:plc4x")
-                    .setHeader(Constants.FIELD_QUERY_HEADER, constant(new PlcField() {
-                    }))
-                    .setBody(constant((byte) 0x0))
+                    .setBody(constant(Arrays.asList(new TagData("test","testAddress",false))))
                     .to("plc4x:mock:10.10.10.1/1/1")
                     .to("mock:result");
                 from("direct:plc4x2")
-                    .setHeader(Constants.FIELD_QUERY_HEADER, constant(new PlcField() {
-                    }))
-                    .setBody(constant(Arrays.asList((byte) 0x0, (byte) 0x1, (byte) 0x2, (byte) 0x3)))
+                    .setBody(constant(Arrays.asList(new TagData("test2","testAddress2",0x05))))
                     .to("plc4x:mock:10.10.10.1/1/1")
                     .to("mock:result");
-                from("plc4x:mock:10.10.10.1/1/1?address=Main.by0&dataType=java.lang.String")
+                from(producer)
                     .log("Got ${body}");
             }
         };
diff --git a/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XConsumerTest.java b/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XConsumerTest.java
index 60e594d..a28ad44 100644
--- a/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XConsumerTest.java
+++ b/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XConsumerTest.java
@@ -23,21 +23,6 @@ import org.junit.jupiter.api.Test;
 // TODO: implement me
 public class Plc4XConsumerTest {
 
-    @Test
-    public void toStringTest() {
-    }
-
-    @Test
-    public void getEndpoint() {
-    }
-
-    @Test
-    public void getExceptionHandler() {
-    }
-
-    @Test
-    public void setExceptionHandler() {
-    }
 
     @Test
     public void doStart() {
@@ -47,7 +32,4 @@ public class Plc4XConsumerTest {
     public void doStop() {
     }
 
-    @Test
-    public void accept() {
-    }
 }
\ No newline at end of file
diff --git a/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java b/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
index 636ae9d..a205b10 100644
--- a/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
+++ b/plc4j/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
@@ -20,7 +20,6 @@ package org.apache.plc4x.camel;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.junit.jupiter.api.BeforeEach;
@@ -28,7 +27,6 @@ import org.junit.jupiter.api.Test;
 
 import java.lang.reflect.Field;
 import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.mockito.Mockito.*;
@@ -43,21 +41,21 @@ public class Plc4XProducerTest {
     public void setUp() throws Exception {
         Plc4XEndpoint endpointMock = mock(Plc4XEndpoint.class, RETURNS_DEEP_STUBS);
         when(endpointMock.getEndpointUri()).thenReturn("plc4x:mock:10.10.10.1/1/1");
-        PlcDriverManager plcDriverManagerMock = mock(PlcDriverManager.class, RETURNS_DEEP_STUBS);
+        PlcConnection mockConnection = mock(PlcConnection.class, RETURNS_DEEP_STUBS);
 
-        when(plcDriverManagerMock.getConnection(anyString()).getMetadata().canRead()).thenReturn(true);
-        when(plcDriverManagerMock.getConnection(anyString()).getMetadata().canWrite()).thenReturn(true);
-        when(plcDriverManagerMock.getConnection(anyString()).writeRequestBuilder())
+        when(mockConnection.getMetadata().canRead()).thenReturn(true);
+        when(mockConnection.getMetadata().canWrite()).thenReturn(true);
+        when(mockConnection.writeRequestBuilder())
             .thenReturn(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS));
 
-        when(endpointMock.getPlcDriverManager()).thenReturn(plcDriverManagerMock);
+        when(endpointMock.getConnection()).thenReturn(mockConnection);
         SUT = new Plc4XProducer(endpointMock);
         testExchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
 
-        when(testExchange.getIn().getHeader(eq(Constants.FIELD_NAME_HEADER), eq(String.class)))
-            .thenReturn("Hurz");
-        when(testExchange.getIn().getHeader(eq(Constants.FIELD_QUERY_HEADER), eq(String.class)))
-            .thenReturn("PlcField.class");
+        when(testExchange.getIn().getBody())
+            .thenReturn(Arrays.asList(new TagData("testName","testAddress",0),
+                new TagData("testName2","testAddress2",true),
+                new TagData("testName3","testAddress3","testVal")));
     }
 
     @Test
@@ -66,11 +64,8 @@ public class Plc4XProducerTest {
         SUT.process(testExchange);
         when(testExchange.getPattern()).thenReturn(ExchangePattern.InOut);
         SUT.process(testExchange);
-        when(testExchange.getPattern()).thenReturn(ExchangePattern.InOptionalOut);
-        SUT.process(testExchange);
-        when(testExchange.getIn().getBody()).thenReturn(Arrays.asList("test","test"));
-        when(testExchange.getIn().getBody(eq(List.class))).thenReturn(Arrays.asList("test","test"));
-        SUT.process(testExchange);
+        when(testExchange.getIn().getBody()).thenReturn(2);
+
     }
 
     @Test
@@ -83,9 +78,6 @@ public class Plc4XProducerTest {
         when(testExchange.getPattern()).thenReturn(ExchangePattern.InOut);
         SUT.process(testExchange, doneSync -> {
         });
-        when(testExchange.getPattern()).thenReturn(ExchangePattern.InOptionalOut);
-        SUT.process(testExchange, doneSync -> {
-        });
     }
 
     @Test