You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/02/01 13:19:54 UTC

[camel] branch camel-3.x updated: camel-plc4x: plc4x improvements in camel 3 (CAMEL-18861) (#9282)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new d0bdd5ebfc1 camel-plc4x: plc4x improvements in camel 3 (CAMEL-18861) (#9282)
d0bdd5ebfc1 is described below

commit d0bdd5ebfc1290c1d4aad95657153376bada0df5
Author: Stein Overtoom <st...@overtoom.email>
AuthorDate: Wed Feb 1 14:19:47 2023 +0100

    camel-plc4x: plc4x improvements in camel 3 (CAMEL-18861) (#9282)
    
    * camel-plc4x: Connection now established in doStart() of Producer/Consumer instead of constructors (#8967)
    
    Co-authored-by: Stein Overtoom <st...@triopsys.nl>
    
    * camel-plc4x: Added autoReconnect uri parameter (#8971)
    
    * camel-plc4x: Added autoReconnect uri parameter
    
    * camel-plc4x: fix autoReconnect parameter style
    
    * camel-plc4x: autoReconnect parameter, log level to warn (code review)
    
    Co-authored-by: Stein Overtoom <st...@triopsys.nl>
    
    * camel-plc4x Implemented Plc4XPollingConsumer and some refactoring (CAMEL-18861) (#9254)
    
    * camel-plc4x: fixed Plc4XComponentTest unit test (CAMEL-18861)
    
    * camel-plc4x: Implemented Plc4XPollingConsumer (CAMEL-18861)
    
    * camel-plc4x: Refactored building ReadRequest into separate method (CAMEL-18861)
    
    * camel-plc4x: Improved auto-reconnect (CAMEL-18861)
                 - An exchange with an empty map is created when no connection could be made, instead of no exchange at all
                 - shuts down endpoint if auto-reconnect is turned off and connection is lost (upon first request)
    
    * camel-plc4x: Moved connection unit test from producer to endpoint test (CAMEL-18861)
    
    * camel-plc4x: Fixed unit tests, removed reflection from Endpoint and Producer tests and fixed canWrite bug in Producer (CAMEL-18861)
    
    * camel-plc4x: Exceptions are now only logged at trace level (CAMEL-18861)
    
    * camel-plc4x: Updated readme with PollingConsumer (CAMEL-18861)
    
    * camel-plc4x: Fixed formatting after Psourcecheck (CAMEL-18861)
    
    * camel-plc4x: Removed unused imports in Plc4XConsumer (CAMEL-18861)
    
    * camel-plc4x: Ran formatter (CAMEL-18861)
    
    ---------
    
    Co-authored-by: Stein Overtoom <st...@triopsys.nl>
    
    * camel-plc4x: Fixed inconsistently passing/failing unit test (CAMEL-18861) (#9280)
    
    Co-authored-by: Stein Overtoom <st...@triopsys.nl>
    
    ---------
    
    Co-authored-by: Stein Overtoom <st...@triopsys.nl>
---
 .../camel-plc4x/src/main/docs/plc4x-component.adoc |   7 +-
 .../camel/component/plc4x/Plc4XConsumer.java       |  53 +++++----
 .../camel/component/plc4x/Plc4XEndpoint.java       | 117 ++++++++++++++++++--
 .../component/plc4x/Plc4XPollingConsumer.java      | 123 +++++++++++++++++++++
 .../camel/component/plc4x/Plc4XProducer.java       |  45 ++++----
 .../camel/component/plc4x/Plc4XComponentTest.java  |   4 +-
 .../camel/component/plc4x/Plc4XEndpointTest.java   |   9 ++
 .../camel/component/plc4x/Plc4XProducerTest.java   |  29 +----
 8 files changed, 298 insertions(+), 89 deletions(-)

diff --git a/components/camel-plc4x/src/main/docs/plc4x-component.adoc b/components/camel-plc4x/src/main/docs/plc4x-component.adoc
index 87cedd0bec9..7bf4450d524 100644
--- a/components/camel-plc4x/src/main/docs/plc4x-component.adoc
+++ b/components/camel-plc4x/src/main/docs/plc4x-component.adoc
@@ -63,12 +63,15 @@ where `$\{camel-version}` must be replaced by the actual version of Camel.
 
 
 == Consumer
-The consumer supports one-time reading or Triggered Reading. (_Schedulded Reading using Period only soon_).To read from
+The consumer supports one-time reading or Triggered Reading. To read from
 the PLC, use a  `Map<String,String>` containing the Alias and Queries for the Data you want.
 
-The Body create by the Consumer will be a `Map<String,Object>` containing the Aliases and there associated value
+The Body created by the Consumer will be a `Map<String,Object>` containing the Aliases and their associated value
 read from the PLC.
 
+== Polling Consumer
+The polling consumer supports consecutive reading. The input and output is the same as for the regular consumer.
+
 == Producer
 To write data to the PLC, we also use a `Map`. The difference with the Producer is that the `Value` of the Map has also to
 be a Map. Also, this `Map` has to be set into the `Body` of the `Message`
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
index 69bd8f56e8b..7f1607eebec 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
@@ -29,9 +29,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
-import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcIncompatibleDatatypeException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
 import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
@@ -46,7 +44,6 @@ import org.slf4j.LoggerFactory;
 public class Plc4XConsumer extends DefaultConsumer {
     private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XConsumer.class);
 
-    private PlcConnection plcConnection;
     private final Map<String, Object> tags;
     private final String trigger;
     private final Plc4XEndpoint plc4XEndpoint;
@@ -74,9 +71,15 @@ public class Plc4XConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        this.plcConnection = plc4XEndpoint.getConnection();
-        if (!plcConnection.isConnected()) {
-            plc4XEndpoint.reconnect();
+        try {
+            plc4XEndpoint.setupConnection();
+        } catch (PlcConnectionException e) {
+            if (LOGGER.isTraceEnabled()) {
+                LOGGER.error("Connection setup failed, stopping Consumer", e);
+            } else {
+                LOGGER.error("Connection setup failed, stopping Consumer");
+            }
+            doStop();
         }
         if (trigger == null) {
             startUnTriggered();
@@ -86,25 +89,19 @@ public class Plc4XConsumer extends DefaultConsumer {
     }
 
     private void startUnTriggered() {
-        if (plc4XEndpoint.isAutoReconnect() && !plcConnection.isConnected()) {
-            try {
-                plc4XEndpoint.reconnect();
-                LOGGER.debug("Successfully reconnected");
-            } catch (PlcConnectionException e) {
+        try {
+            plc4XEndpoint.reconnectIfNeeded();
+        } catch (PlcConnectionException e) {
+            if (LOGGER.isTraceEnabled()) {
                 LOGGER.warn("Unable to reconnect, skipping request", e);
-                return;
-            }
-        }
-        PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
-        for (Map.Entry<String, Object> tag : tags.entrySet()) {
-            try {
-                builder.addItem(tag.getKey(), (String) tag.getValue());
-            } catch (PlcIncompatibleDatatypeException e) {
-                LOGGER.error("For consumer, please use Map<String,String>, currently using {}",
-                        tags.getClass().getSimpleName());
+            } else {
+                LOGGER.warn("Unable to reconnect, skipping request");
             }
+            return;
         }
-        PlcReadRequest request = builder.build();
+
+        PlcReadRequest request = plc4XEndpoint.buildPlcReadRequest();
+
         future = executorService.schedule(() -> request.execute().thenAccept(response -> {
             try {
                 Exchange exchange = plc4XEndpoint.createExchange();
@@ -126,15 +123,17 @@ public class Plc4XConsumer extends DefaultConsumer {
 
         TriggeredScraperImpl scraper = new TriggeredScraperImpl(configuration, (job, alias, response) -> {
             try {
-                if (plc4XEndpoint.isAutoReconnect() && !plcConnection.isConnected()) {
-                    plc4XEndpoint.reconnect();
-                    LOGGER.debug("Successfully reconnected");
-                }
+                plc4XEndpoint.reconnectIfNeeded();
+
                 Exchange exchange = plc4XEndpoint.createExchange();
                 exchange.getIn().setBody(response);
                 getProcessor().process(exchange);
             } catch (PlcConnectionException e) {
-                LOGGER.warn("Unable to reconnect, skipping request", e);
+                if (LOGGER.isTraceEnabled()) {
+                    LOGGER.warn("Unable to reconnect, skipping request", e);
+                } else {
+                    LOGGER.warn("Unable to reconnect, skipping request");
+                }
             } catch (Exception e) {
                 getExceptionHandler().handleException(e);
             }
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
index 7a1a70b2118..1d8a51c639b 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 import org.apache.camel.Category;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
@@ -32,7 +33,12 @@ import org.apache.camel.support.DefaultEndpoint;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcIncompatibleDatatypeException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Read and write to PLC devices
@@ -40,6 +46,10 @@ import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 @UriEndpoint(scheme = "plc4x", firstVersion = "3.20.0", title = "PLC4X",
              syntax = "plc4x:driver", category = Category.IOT)
 public class Plc4XEndpoint extends DefaultEndpoint {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XEndpoint.class);
+
+    protected PlcDriverManager plcDriverManager;
+    protected PlcConnection connection;
 
     @UriPath
     @Metadata(required = true, description = "PLC4X connection string for the connection to the target")
@@ -58,8 +68,6 @@ public class Plc4XEndpoint extends DefaultEndpoint {
     @Metadata(description = "Whether to reconnect when no connection is present upon doing a request")
     private boolean autoReconnect;
 
-    private PlcDriverManager plcDriverManager;
-    private PlcConnection connection;
     private String uri;
 
     public Plc4XEndpoint(String endpointUri, Component component) {
@@ -97,15 +105,63 @@ public class Plc4XEndpoint extends DefaultEndpoint {
         return autoReconnect;
     }
 
-    public PlcConnection getConnection() throws PlcConnectionException {
-        if (this.connection == null) {
-            this.connection = plcDriverManager.getConnection(this.uri);
+    /**
+     * Set up the connection.
+     * <p>
+     *
+     * @throws PlcConnectionException if no connection could be established and auto-reconnect is turned off
+     */
+    public void setupConnection() throws PlcConnectionException {
+        try {
+            connection = plcDriverManager.getConnection(this.uri);
+            if (!connection.isConnected()) {
+                reconnectIfNeeded();
+            }
+        } catch (PlcConnectionException e) {
+            if (isAutoReconnect()) {
+                if (LOGGER.isTraceEnabled()) {
+                    LOGGER.warn("Could not connect during setup, retrying on next request", e);
+                } else {
+                    LOGGER.warn("Could not connect during setup, retrying on next request");
+                }
+            } else {
+                LOGGER.warn("Could not connect during setup and auto reconnect is turned off");
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * Reconnects if needed. If connection is lost and auto-reconnect is turned off, endpoint will be shutdown.
+     * <p>
+     *
+     * @throws PlcConnectionException If reconnect failed and auto-reconnect is turned on
+     */
+    public void reconnectIfNeeded() throws PlcConnectionException {
+        if (connection != null && connection.isConnected()) {
+            LOGGER.trace("No reconnect needed, already connected");
+        } else if (autoReconnect && connection == null) {
+            connection = plcDriverManager.getConnection(uri);
+            LOGGER.debug("Successfully reconnected");
+        } else if (autoReconnect && !connection.isConnected()) {
+            connection.connect();
+            // If reconnection fails without Exception, reset connection
+            if (!connection.isConnected()) {
+                LOGGER.debug("No connection established after connect, resetting connection");
+                connection = plcDriverManager.getConnection(uri);
+            }
+            LOGGER.debug("Successfully reconnected");
+        } else {
+            LOGGER.warn("Connection lost and auto-reconnect is turned off, shutting down Plc4XEndpoint");
+            stop();
         }
-        return connection;
     }
 
-    public void reconnect() throws PlcConnectionException {
-        connection.connect();
+    /**
+     * @return true if connection supports writing, else false
+     */
+    public boolean canWrite() {
+        return connection.getMetadata().canWrite();
     }
 
     @Override
@@ -120,6 +176,51 @@ public class Plc4XEndpoint extends DefaultEndpoint {
         return consumer;
     }
 
+    @Override
+    public PollingConsumer createPollingConsumer() {
+        LOGGER.debug("Creating Plc4XPollingConsumer");
+        return new Plc4XPollingConsumer(this);
+    }
+
+    /**
+     * Build a {@link PlcReadRequest} using the tags specified in the endpoint.
+     * <p>
+     *
+     * @return {@link PlcReadRequest}
+     */
+    public PlcReadRequest buildPlcReadRequest() {
+        PlcReadRequest.Builder builder = connection.readRequestBuilder();
+        for (Map.Entry<String, Object> tag : tags.entrySet()) {
+            try {
+                builder.addItem(tag.getKey(), (String) tag.getValue());
+            } catch (PlcIncompatibleDatatypeException e) {
+                LOGGER.error("For consumer, please use Map<String,String>, currently using {}",
+                        tags.getClass().getSimpleName());
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Build a {@link PlcWriteRequest}.
+     * <p>
+     *
+     * @param  tags tags to add to write request
+     * @return      {@link PlcWriteRequest}
+     */
+    public PlcWriteRequest buildPlcWriteRequest(Map<String, Map<String, Object>> tags) {
+        PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
+
+        for (Map.Entry<String, Map<String, Object>> entry : tags.entrySet()) {
+            //Tags are stored like this --> Map<Tagname,Map<Query,Value>> for writing
+            String name = entry.getKey();
+            String query = entry.getValue().keySet().iterator().next();
+            Object value = entry.getValue().get(query);
+            builder.addItem(name, query, value);
+        }
+        return builder.build();
+    }
+
     public PlcDriverManager getPlcDriverManager() {
         return plcDriverManager;
     }
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XPollingConsumer.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XPollingConsumer.java
new file mode 100644
index 00000000000..edc83475e0d
--- /dev/null
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XPollingConsumer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.plc4x;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.EventDrivenPollingConsumer;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Plc4XPollingConsumer extends EventDrivenPollingConsumer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XPollingConsumer.class);
+
+    private final Plc4XEndpoint plc4XEndpoint;
+
+    public Plc4XPollingConsumer(Plc4XEndpoint endpoint) {
+        super(endpoint);
+        this.plc4XEndpoint = endpoint;
+    }
+
+    @Override
+    public String toString() {
+        return "Plc4XPollingConsumer[" + plc4XEndpoint + "]";
+    }
+
+    @Override
+    public Endpoint getEndpoint() {
+        return plc4XEndpoint;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        try {
+            plc4XEndpoint.setupConnection();
+        } catch (PlcConnectionException e) {
+            if (LOGGER.isTraceEnabled()) {
+                LOGGER.error("Connection setup failed, stopping PollingConsumer", e);
+            } else {
+                LOGGER.error("Connection setup failed, stopping PollingConsumer");
+            }
+            doStop();
+        }
+    }
+
+    @Override
+    public Exchange receive() {
+        return doReceive(-1);
+    }
+
+    @Override
+    public Exchange receiveNoWait() {
+        return doReceive(0);
+    }
+
+    @Override
+    public Exchange receive(long timeout) {
+        return doReceive(timeout);
+    }
+
+    protected Exchange doReceive(long timeout) {
+        Exchange exchange = plc4XEndpoint.createExchange();
+        try {
+            plc4XEndpoint.reconnectIfNeeded();
+
+            PlcReadRequest request = plc4XEndpoint.buildPlcReadRequest();
+            CompletableFuture<? extends PlcReadResponse> future
+                    = request.execute().whenComplete((plcReadResponse, throwable) -> {
+                    });
+            PlcReadResponse response;
+            if (timeout >= 0) {
+                response = future.get(timeout, TimeUnit.MILLISECONDS);
+            } else {
+                response = future.get();
+            }
+
+            Map<String, Object> rsp = new HashMap<>();
+            for (String field : response.getFieldNames()) {
+                rsp.put(field, response.getObject(field));
+            }
+            exchange.getIn().setBody(rsp);
+        } catch (ExecutionException | TimeoutException e) {
+            getExceptionHandler().handleException(e);
+            exchange.getIn().setBody(new HashMap<>());
+        } catch (InterruptedException e) {
+            getExceptionHandler().handleException(e);
+            Thread.currentThread().interrupt();
+        } catch (PlcConnectionException e) {
+            if (LOGGER.isTraceEnabled()) {
+                LOGGER.warn("Unable to reconnect, skipping request", e);
+            } else {
+                LOGGER.warn("Unable to reconnect, skipping request");
+            }
+            exchange.getIn().setBody(new HashMap<>());
+        }
+        return exchange;
+    }
+
+}
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
index 77ca0244d15..7db6b96ac9c 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
@@ -25,7 +25,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
@@ -35,9 +34,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Plc4XProducer extends DefaultAsyncProducer {
+    protected AtomicInteger openRequests;
     private final Logger log = LoggerFactory.getLogger(Plc4XProducer.class);
-    private PlcConnection plcConnection;
-    private AtomicInteger openRequests;
     private final Plc4XEndpoint plc4XEndpoint;
 
     public Plc4XProducer(Plc4XEndpoint endpoint) {
@@ -49,43 +47,44 @@ public class Plc4XProducer extends DefaultAsyncProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        this.plcConnection = plc4XEndpoint.getConnection();
-        if (!plcConnection.isConnected()) {
-            plc4XEndpoint.reconnect();
+        try {
+            plc4XEndpoint.setupConnection();
+        } catch (PlcConnectionException e) {
+            if (log.isTraceEnabled()) {
+                log.error("Connection setup failed, stopping producer", e);
+            } else {
+                log.error("Connection setup failed, stopping producer");
+            }
+            doStop();
         }
-        if (!plcConnection.getMetadata().canWrite()) {
+        if (!plc4XEndpoint.canWrite()) {
             throw new PlcException("This connection (" + plc4XEndpoint.getUri() + ") doesn't support writing.");
         }
     }
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        if (plc4XEndpoint.isAutoReconnect() && !plcConnection.isConnected()) {
-            try {
-                plc4XEndpoint.reconnect();
-                log.debug("Successfully reconnected");
-            } catch (PlcConnectionException e) {
+        try {
+            plc4XEndpoint.reconnectIfNeeded();
+        } catch (PlcConnectionException e) {
+            if (log.isTraceEnabled()) {
                 log.warn("Unable to reconnect, skipping request", e);
-                return;
+            } else {
+                log.warn("Unable to reconnect, skipping request");
             }
+            return;
         }
+
         Message in = exchange.getIn();
         Object body = in.getBody();
-        PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder();
+        PlcWriteRequest plcWriteRequest;
         if (body instanceof Map) { //Check if we have a Map
             Map<String, Map<String, Object>> tags = (Map<String, Map<String, Object>>) body;
-            for (Map.Entry<String, Map<String, Object>> entry : tags.entrySet()) {
-                //Tags are stored like this --> Map<Tagname,Map<Query,Value>> for writing
-                String name = entry.getKey();
-                String query = entry.getValue().keySet().iterator().next();
-                Object value = entry.getValue().get(query);
-                builder.addItem(name, query, value);
-            }
+            plcWriteRequest = plc4XEndpoint.buildPlcWriteRequest(tags);
         } else {
             throw new PlcInvalidFieldException("The body must contain a Map<String,Map<String,Object>");
         }
-
-        CompletableFuture<? extends PlcWriteResponse> completableFuture = builder.build().execute();
+        CompletableFuture<? extends PlcWriteResponse> completableFuture = plcWriteRequest.execute();
         int currentlyOpenRequests = openRequests.incrementAndGet();
         try {
             log.debug("Currently open requests including {}:{}", exchange, currentlyOpenRequests);
diff --git a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XComponentTest.java b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XComponentTest.java
index 28027ee6c51..749dd89ca20 100644
--- a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XComponentTest.java
+++ b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XComponentTest.java
@@ -31,7 +31,6 @@ public class Plc4XComponentTest extends CamelTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMinimumMessageCount(1);
         mock.expectedMessageCount(2);
-
         template.asyncSendBody("direct:plc4x", Collections.singletonList("irrelevant"));
         template.asyncSendBody("direct:plc4x2", Collections.singletonList("irrelevant"));
 
@@ -41,11 +40,12 @@ public class Plc4XComponentTest extends CamelTestSupport {
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
-            public void configure() {
+            public void configure() throws Exception {
                 Map<String, Object> tags = new HashMap<>();
                 tags.put("Test1", "%TestQuery");
                 Plc4XEndpoint producer = getContext().getEndpoint("plc4x:mock:10.10.10.1/1/1", Plc4XEndpoint.class);
                 producer.setTags(tags);
+                producer.setAutoReconnect(true);
                 from("direct:plc4x")
                         .setBody(constant(Collections.singletonMap("test", Collections.singletonMap("testAddress", false))))
                         .to("plc4x:mock:10.10.10.1/1/1")
diff --git a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
index c72002835a5..7d84236536a 100644
--- a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
+++ b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.plc4x;
 import org.apache.camel.Component;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -54,4 +55,12 @@ public class Plc4XEndpointTest {
         assertThat(sut.isSingleton(), is(true));
     }
 
+    @Test
+    public void doStopBadConnection() throws Exception {
+        PlcConnection plcConnectionMock = mock(PlcConnection.class);
+        sut.connection = plcConnectionMock;
+        doThrow(new RuntimeException("oh noes")).when(plcConnectionMock).close();
+        sut.doStop();
+    }
+
 }
diff --git a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
index 35250d6944d..cfc6926c60d 100644
--- a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
+++ b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
@@ -16,16 +16,12 @@
  */
 package org.apache.camel.component.plc4x;
 
-import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -41,16 +37,9 @@ public class Plc4XProducerTest {
     public void setUp() throws Exception {
         Plc4XEndpoint endpointMock = mock(Plc4XEndpoint.class, RETURNS_DEEP_STUBS);
         when(endpointMock.getEndpointUri()).thenReturn("plc4x:mock:10.10.10.1/1/1");
-        PlcConnection mockConnection = mock(PlcConnection.class, RETURNS_DEEP_STUBS);
+        when(endpointMock.canWrite()).thenReturn(true);
 
-        when(mockConnection.getMetadata().canRead()).thenReturn(true);
-        when(mockConnection.getMetadata().canWrite()).thenReturn(true);
-        when(mockConnection.writeRequestBuilder())
-                .thenReturn(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS));
-
-        when(endpointMock.getConnection()).thenReturn(mockConnection);
         sut = new Plc4XProducer(endpointMock);
-        sut.doStart();
         testExchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
         Map<String, Map<String, Object>> tags = new HashMap();
         tags.put("test1", Collections.singletonMap("testAddress1", 0));
@@ -89,21 +78,7 @@ public class Plc4XProducerTest {
 
     @Test
     public void doStopOpenRequest() throws Exception {
-        Field openRequests = sut.getClass().getDeclaredField("openRequests");
-        openRequests.setAccessible(true);
-        AtomicInteger atomicInteger = (AtomicInteger) openRequests.get(sut);
-        atomicInteger.incrementAndGet();
-        sut.doStop();
-    }
-
-    @Test
-    public void doStopBadConnection() throws Exception {
-        Field openRequests = sut.getClass().getDeclaredField("plcConnection");
-        openRequests.setAccessible(true);
-        PlcConnection plcConnectionMock = mock(PlcConnection.class);
-        doThrow(new RuntimeException("oh noes")).when(plcConnectionMock).close();
-        openRequests.set(sut, plcConnectionMock);
+        sut.openRequests.incrementAndGet();
         sut.doStop();
     }
-
 }