You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/02/01 13:19:54 UTC
[camel] branch camel-3.x updated: camel-plc4x: plc4x improvements in camel 3 (CAMEL-18861) (#9282)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push:
new d0bdd5ebfc1 camel-plc4x: plc4x improvements in camel 3 (CAMEL-18861) (#9282)
d0bdd5ebfc1 is described below
commit d0bdd5ebfc1290c1d4aad95657153376bada0df5
Author: Stein Overtoom <st...@overtoom.email>
AuthorDate: Wed Feb 1 14:19:47 2023 +0100
camel-plc4x: plc4x improvements in camel 3 (CAMEL-18861) (#9282)
* camel-plc4x: Connection now established in doStart() of Producer/Consumer instead of constructors (#8967)
Co-authored-by: Stein Overtoom <st...@triopsys.nl>
* camel-plc4x: Added autoReconnect uri parameter (#8971)
* camel-plc4x: Added autoReconnect uri parameter
* camel-plc4x: fix autoReconnect parameter style
* camel-plc4x: autoReconnect parameter, log level to warn (code review)
Co-authored-by: Stein Overtoom <st...@triopsys.nl>
* camel-plc4x Implemented Plc4XPollingConsumer and some refactoring (CAMEL-18861) (#9254)
* camel-plc4x: fixed Plc4XComponentTest unit test (CAMEL-18861)
* camel-plc4x: Implemented Plc4XPollingConsumer (CAMEL-18861)
* camel-plc4x: Refactored building ReadRequest into separate method (CAMEL-18861)
* camel-plc4x: Improved auto-reconnect (CAMEL-18861)
- An exchange with an empty map is created when no connection could be made, instead of no exchange at all
- shuts down endpoint if auto-reconnect is turned off and connection is lost (upon first request)
* camel-plc4x: Moved connection unit test from producer to endpoint test (CAMEL-18861)
* camel-plc4x: Fixed unit tests, removed reflection from Endpoint and Producer tests and fixed canWrite bug in Producer (CAMEL-18861)
* camel-plc4x: Exceptions are now only logged at trace level (CAMEL-18861)
* camel-plc4x: Updated readme with PollingConsumer (CAMEL-18861)
* camel-plc4x: Fixed formatting after Psourcecheck (CAMEL-18861)
* camel-plc4x: Removed unused imports in Plc4XConsumer (CAMEL-18861)
* camel-plc4x: Ran formatter (CAMEL-18861)
---------
Co-authored-by: Stein Overtoom <st...@triopsys.nl>
* camel-plc4x: Fixed inconsistently passing/failing unit test (CAMEL-18861) (#9280)
Co-authored-by: Stein Overtoom <st...@triopsys.nl>
---------
Co-authored-by: Stein Overtoom <st...@triopsys.nl>
---
.../camel-plc4x/src/main/docs/plc4x-component.adoc | 7 +-
.../camel/component/plc4x/Plc4XConsumer.java | 53 +++++----
.../camel/component/plc4x/Plc4XEndpoint.java | 117 ++++++++++++++++++--
.../component/plc4x/Plc4XPollingConsumer.java | 123 +++++++++++++++++++++
.../camel/component/plc4x/Plc4XProducer.java | 45 ++++----
.../camel/component/plc4x/Plc4XComponentTest.java | 4 +-
.../camel/component/plc4x/Plc4XEndpointTest.java | 9 ++
.../camel/component/plc4x/Plc4XProducerTest.java | 29 +----
8 files changed, 298 insertions(+), 89 deletions(-)
diff --git a/components/camel-plc4x/src/main/docs/plc4x-component.adoc b/components/camel-plc4x/src/main/docs/plc4x-component.adoc
index 87cedd0bec9..7bf4450d524 100644
--- a/components/camel-plc4x/src/main/docs/plc4x-component.adoc
+++ b/components/camel-plc4x/src/main/docs/plc4x-component.adoc
@@ -63,12 +63,15 @@ where `$\{camel-version}` must be replaced by the actual version of Camel.
== Consumer
-The consumer supports one-time reading or Triggered Reading. (_Schedulded Reading using Period only soon_).To read from
+The consumer supports one-time reading or Triggered Reading. To read from
the PLC, use a `Map<String,String>` containing the Alias and Queries for the Data you want.
-The Body create by the Consumer will be a `Map<String,Object>` containing the Aliases and there associated value
+The Body created by the Consumer will be a `Map<String,Object>` containing the Aliases and their associated value
read from the PLC.
+== Polling Consumer
+The polling consumer supports consecutive reading. The input and output is the same as for the regular consumer.
+
== Producer
To write data to the PLC, we also use a `Map`. The difference with the Producer is that the `Value` of the Map has also to
be a Map. Also, this `Map` has to be set into the `Body` of the `Message`
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
index 69bd8f56e8b..7f1607eebec 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
@@ -29,9 +29,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
-import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcIncompatibleDatatypeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
@@ -46,7 +44,6 @@ import org.slf4j.LoggerFactory;
public class Plc4XConsumer extends DefaultConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XConsumer.class);
- private PlcConnection plcConnection;
private final Map<String, Object> tags;
private final String trigger;
private final Plc4XEndpoint plc4XEndpoint;
@@ -74,9 +71,15 @@ public class Plc4XConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- this.plcConnection = plc4XEndpoint.getConnection();
- if (!plcConnection.isConnected()) {
- plc4XEndpoint.reconnect();
+ try {
+ plc4XEndpoint.setupConnection();
+ } catch (PlcConnectionException e) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.error("Connection setup failed, stopping Consumer", e);
+ } else {
+ LOGGER.error("Connection setup failed, stopping Consumer");
+ }
+ doStop();
}
if (trigger == null) {
startUnTriggered();
@@ -86,25 +89,19 @@ public class Plc4XConsumer extends DefaultConsumer {
}
private void startUnTriggered() {
- if (plc4XEndpoint.isAutoReconnect() && !plcConnection.isConnected()) {
- try {
- plc4XEndpoint.reconnect();
- LOGGER.debug("Successfully reconnected");
- } catch (PlcConnectionException e) {
+ try {
+ plc4XEndpoint.reconnectIfNeeded();
+ } catch (PlcConnectionException e) {
+ if (LOGGER.isTraceEnabled()) {
LOGGER.warn("Unable to reconnect, skipping request", e);
- return;
- }
- }
- PlcReadRequest.Builder builder = plcConnection.readRequestBuilder();
- for (Map.Entry<String, Object> tag : tags.entrySet()) {
- try {
- builder.addItem(tag.getKey(), (String) tag.getValue());
- } catch (PlcIncompatibleDatatypeException e) {
- LOGGER.error("For consumer, please use Map<String,String>, currently using {}",
- tags.getClass().getSimpleName());
+ } else {
+ LOGGER.warn("Unable to reconnect, skipping request");
}
+ return;
}
- PlcReadRequest request = builder.build();
+
+ PlcReadRequest request = plc4XEndpoint.buildPlcReadRequest();
+
future = executorService.schedule(() -> request.execute().thenAccept(response -> {
try {
Exchange exchange = plc4XEndpoint.createExchange();
@@ -126,15 +123,17 @@ public class Plc4XConsumer extends DefaultConsumer {
TriggeredScraperImpl scraper = new TriggeredScraperImpl(configuration, (job, alias, response) -> {
try {
- if (plc4XEndpoint.isAutoReconnect() && !plcConnection.isConnected()) {
- plc4XEndpoint.reconnect();
- LOGGER.debug("Successfully reconnected");
- }
+ plc4XEndpoint.reconnectIfNeeded();
+
Exchange exchange = plc4XEndpoint.createExchange();
exchange.getIn().setBody(response);
getProcessor().process(exchange);
} catch (PlcConnectionException e) {
- LOGGER.warn("Unable to reconnect, skipping request", e);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.warn("Unable to reconnect, skipping request", e);
+ } else {
+ LOGGER.warn("Unable to reconnect, skipping request");
+ }
} catch (Exception e) {
getExceptionHandler().handleException(e);
}
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
index 7a1a70b2118..1d8a51c639b 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
@@ -22,6 +22,7 @@ import java.util.Objects;
import org.apache.camel.Category;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
+import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.spi.Metadata;
@@ -32,7 +33,12 @@ import org.apache.camel.support.DefaultEndpoint;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcIncompatibleDatatypeException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Read and write to PLC devices
@@ -40,6 +46,10 @@ import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
@UriEndpoint(scheme = "plc4x", firstVersion = "3.20.0", title = "PLC4X",
syntax = "plc4x:driver", category = Category.IOT)
public class Plc4XEndpoint extends DefaultEndpoint {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XEndpoint.class);
+
+ protected PlcDriverManager plcDriverManager;
+ protected PlcConnection connection;
@UriPath
@Metadata(required = true, description = "PLC4X connection string for the connection to the target")
@@ -58,8 +68,6 @@ public class Plc4XEndpoint extends DefaultEndpoint {
@Metadata(description = "Whether to reconnect when no connection is present upon doing a request")
private boolean autoReconnect;
- private PlcDriverManager plcDriverManager;
- private PlcConnection connection;
private String uri;
public Plc4XEndpoint(String endpointUri, Component component) {
@@ -97,15 +105,63 @@ public class Plc4XEndpoint extends DefaultEndpoint {
return autoReconnect;
}
- public PlcConnection getConnection() throws PlcConnectionException {
- if (this.connection == null) {
- this.connection = plcDriverManager.getConnection(this.uri);
+ /**
+ * Set up the connection.
+ * <p>
+ *
+ * @throws PlcConnectionException if no connection could be established and auto-reconnect is turned off
+ */
+ public void setupConnection() throws PlcConnectionException {
+ try {
+ connection = plcDriverManager.getConnection(this.uri);
+ if (!connection.isConnected()) {
+ reconnectIfNeeded();
+ }
+ } catch (PlcConnectionException e) {
+ if (isAutoReconnect()) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.warn("Could not connect during setup, retrying on next request", e);
+ } else {
+ LOGGER.warn("Could not connect during setup, retrying on next request");
+ }
+ } else {
+ LOGGER.warn("Could not connect during setup and auto reconnect is turned off");
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Reconnects if needed. If connection is lost and auto-reconnect is turned off, endpoint will be shutdown.
+ * <p>
+ *
+ * @throws PlcConnectionException If reconnect failed and auto-reconnect is turned on
+ */
+ public void reconnectIfNeeded() throws PlcConnectionException {
+ if (connection != null && connection.isConnected()) {
+ LOGGER.trace("No reconnect needed, already connected");
+ } else if (autoReconnect && connection == null) {
+ connection = plcDriverManager.getConnection(uri);
+ LOGGER.debug("Successfully reconnected");
+ } else if (autoReconnect && !connection.isConnected()) {
+ connection.connect();
+ // If reconnection fails without Exception, reset connection
+ if (!connection.isConnected()) {
+ LOGGER.debug("No connection established after connect, resetting connection");
+ connection = plcDriverManager.getConnection(uri);
+ }
+ LOGGER.debug("Successfully reconnected");
+ } else {
+ LOGGER.warn("Connection lost and auto-reconnect is turned off, shutting down Plc4XEndpoint");
+ stop();
}
- return connection;
}
- public void reconnect() throws PlcConnectionException {
- connection.connect();
+ /**
+ * @return true if connection supports writing, else false
+ */
+ public boolean canWrite() {
+ return connection.getMetadata().canWrite();
}
@Override
@@ -120,6 +176,51 @@ public class Plc4XEndpoint extends DefaultEndpoint {
return consumer;
}
+ @Override
+ public PollingConsumer createPollingConsumer() {
+ LOGGER.debug("Creating Plc4XPollingConsumer");
+ return new Plc4XPollingConsumer(this);
+ }
+
+ /**
+ * Build a {@link PlcReadRequest} using the tags specified in the endpoint.
+ * <p>
+ *
+ * @return {@link PlcReadRequest}
+ */
+ public PlcReadRequest buildPlcReadRequest() {
+ PlcReadRequest.Builder builder = connection.readRequestBuilder();
+ for (Map.Entry<String, Object> tag : tags.entrySet()) {
+ try {
+ builder.addItem(tag.getKey(), (String) tag.getValue());
+ } catch (PlcIncompatibleDatatypeException e) {
+ LOGGER.error("For consumer, please use Map<String,String>, currently using {}",
+ tags.getClass().getSimpleName());
+ }
+ }
+ return builder.build();
+ }
+
+ /**
+ * Build a {@link PlcWriteRequest}.
+ * <p>
+ *
+ * @param tags tags to add to write request
+ * @return {@link PlcWriteRequest}
+ */
+ public PlcWriteRequest buildPlcWriteRequest(Map<String, Map<String, Object>> tags) {
+ PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
+
+ for (Map.Entry<String, Map<String, Object>> entry : tags.entrySet()) {
+ //Tags are stored like this --> Map<Tagname,Map<Query,Value>> for writing
+ String name = entry.getKey();
+ String query = entry.getValue().keySet().iterator().next();
+ Object value = entry.getValue().get(query);
+ builder.addItem(name, query, value);
+ }
+ return builder.build();
+ }
+
public PlcDriverManager getPlcDriverManager() {
return plcDriverManager;
}
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XPollingConsumer.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XPollingConsumer.java
new file mode 100644
index 00000000000..edc83475e0d
--- /dev/null
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XPollingConsumer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.plc4x;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.EventDrivenPollingConsumer;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Plc4XPollingConsumer extends EventDrivenPollingConsumer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XPollingConsumer.class);
+
+ private final Plc4XEndpoint plc4XEndpoint;
+
+ public Plc4XPollingConsumer(Plc4XEndpoint endpoint) {
+ super(endpoint);
+ this.plc4XEndpoint = endpoint;
+ }
+
+ @Override
+ public String toString() {
+ return "Plc4XPollingConsumer[" + plc4XEndpoint + "]";
+ }
+
+ @Override
+ public Endpoint getEndpoint() {
+ return plc4XEndpoint;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ try {
+ plc4XEndpoint.setupConnection();
+ } catch (PlcConnectionException e) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.error("Connection setup failed, stopping PollingConsumer", e);
+ } else {
+ LOGGER.error("Connection setup failed, stopping PollingConsumer");
+ }
+ doStop();
+ }
+ }
+
+ @Override
+ public Exchange receive() {
+ return doReceive(-1);
+ }
+
+ @Override
+ public Exchange receiveNoWait() {
+ return doReceive(0);
+ }
+
+ @Override
+ public Exchange receive(long timeout) {
+ return doReceive(timeout);
+ }
+
+ protected Exchange doReceive(long timeout) {
+ Exchange exchange = plc4XEndpoint.createExchange();
+ try {
+ plc4XEndpoint.reconnectIfNeeded();
+
+ PlcReadRequest request = plc4XEndpoint.buildPlcReadRequest();
+ CompletableFuture<? extends PlcReadResponse> future
+ = request.execute().whenComplete((plcReadResponse, throwable) -> {
+ });
+ PlcReadResponse response;
+ if (timeout >= 0) {
+ response = future.get(timeout, TimeUnit.MILLISECONDS);
+ } else {
+ response = future.get();
+ }
+
+ Map<String, Object> rsp = new HashMap<>();
+ for (String field : response.getFieldNames()) {
+ rsp.put(field, response.getObject(field));
+ }
+ exchange.getIn().setBody(rsp);
+ } catch (ExecutionException | TimeoutException e) {
+ getExceptionHandler().handleException(e);
+ exchange.getIn().setBody(new HashMap<>());
+ } catch (InterruptedException e) {
+ getExceptionHandler().handleException(e);
+ Thread.currentThread().interrupt();
+ } catch (PlcConnectionException e) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.warn("Unable to reconnect, skipping request", e);
+ } else {
+ LOGGER.warn("Unable to reconnect, skipping request");
+ }
+ exchange.getIn().setBody(new HashMap<>());
+ }
+ return exchange;
+ }
+
+}
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
index 77ca0244d15..7db6b96ac9c 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
@@ -25,7 +25,6 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
@@ -35,9 +34,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Plc4XProducer extends DefaultAsyncProducer {
+ protected AtomicInteger openRequests;
private final Logger log = LoggerFactory.getLogger(Plc4XProducer.class);
- private PlcConnection plcConnection;
- private AtomicInteger openRequests;
private final Plc4XEndpoint plc4XEndpoint;
public Plc4XProducer(Plc4XEndpoint endpoint) {
@@ -49,43 +47,44 @@ public class Plc4XProducer extends DefaultAsyncProducer {
@Override
protected void doStart() throws Exception {
super.doStart();
- this.plcConnection = plc4XEndpoint.getConnection();
- if (!plcConnection.isConnected()) {
- plc4XEndpoint.reconnect();
+ try {
+ plc4XEndpoint.setupConnection();
+ } catch (PlcConnectionException e) {
+ if (log.isTraceEnabled()) {
+ log.error("Connection setup failed, stopping producer", e);
+ } else {
+ log.error("Connection setup failed, stopping producer");
+ }
+ doStop();
}
- if (!plcConnection.getMetadata().canWrite()) {
+ if (!plc4XEndpoint.canWrite()) {
throw new PlcException("This connection (" + plc4XEndpoint.getUri() + ") doesn't support writing.");
}
}
@Override
public void process(Exchange exchange) throws Exception {
- if (plc4XEndpoint.isAutoReconnect() && !plcConnection.isConnected()) {
- try {
- plc4XEndpoint.reconnect();
- log.debug("Successfully reconnected");
- } catch (PlcConnectionException e) {
+ try {
+ plc4XEndpoint.reconnectIfNeeded();
+ } catch (PlcConnectionException e) {
+ if (log.isTraceEnabled()) {
log.warn("Unable to reconnect, skipping request", e);
- return;
+ } else {
+ log.warn("Unable to reconnect, skipping request");
}
+ return;
}
+
Message in = exchange.getIn();
Object body = in.getBody();
- PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder();
+ PlcWriteRequest plcWriteRequest;
if (body instanceof Map) { //Check if we have a Map
Map<String, Map<String, Object>> tags = (Map<String, Map<String, Object>>) body;
- for (Map.Entry<String, Map<String, Object>> entry : tags.entrySet()) {
- //Tags are stored like this --> Map<Tagname,Map<Query,Value>> for writing
- String name = entry.getKey();
- String query = entry.getValue().keySet().iterator().next();
- Object value = entry.getValue().get(query);
- builder.addItem(name, query, value);
- }
+ plcWriteRequest = plc4XEndpoint.buildPlcWriteRequest(tags);
} else {
throw new PlcInvalidFieldException("The body must contain a Map<String,Map<String,Object>");
}
-
- CompletableFuture<? extends PlcWriteResponse> completableFuture = builder.build().execute();
+ CompletableFuture<? extends PlcWriteResponse> completableFuture = plcWriteRequest.execute();
int currentlyOpenRequests = openRequests.incrementAndGet();
try {
log.debug("Currently open requests including {}:{}", exchange, currentlyOpenRequests);
diff --git a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XComponentTest.java b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XComponentTest.java
index 28027ee6c51..749dd89ca20 100644
--- a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XComponentTest.java
+++ b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XComponentTest.java
@@ -31,7 +31,6 @@ public class Plc4XComponentTest extends CamelTestSupport {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMinimumMessageCount(1);
mock.expectedMessageCount(2);
-
template.asyncSendBody("direct:plc4x", Collections.singletonList("irrelevant"));
template.asyncSendBody("direct:plc4x2", Collections.singletonList("irrelevant"));
@@ -41,11 +40,12 @@ public class Plc4XComponentTest extends CamelTestSupport {
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
- public void configure() {
+ public void configure() throws Exception {
Map<String, Object> tags = new HashMap<>();
tags.put("Test1", "%TestQuery");
Plc4XEndpoint producer = getContext().getEndpoint("plc4x:mock:10.10.10.1/1/1", Plc4XEndpoint.class);
producer.setTags(tags);
+ producer.setAutoReconnect(true);
from("direct:plc4x")
.setBody(constant(Collections.singletonMap("test", Collections.singletonMap("testAddress", false))))
.to("plc4x:mock:10.10.10.1/1/1")
diff --git a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
index c72002835a5..7d84236536a 100644
--- a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
+++ b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.plc4x;
import org.apache.camel.Component;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.plc4x.java.api.PlcConnection;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -54,4 +55,12 @@ public class Plc4XEndpointTest {
assertThat(sut.isSingleton(), is(true));
}
+ @Test
+ public void doStopBadConnection() throws Exception {
+ PlcConnection plcConnectionMock = mock(PlcConnection.class);
+ sut.connection = plcConnectionMock;
+ doThrow(new RuntimeException("oh noes")).when(plcConnectionMock).close();
+ sut.doStop();
+ }
+
}
diff --git a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
index 35250d6944d..cfc6926c60d 100644
--- a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
+++ b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
@@ -16,16 +16,12 @@
*/
package org.apache.camel.component.plc4x;
-import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -41,16 +37,9 @@ public class Plc4XProducerTest {
public void setUp() throws Exception {
Plc4XEndpoint endpointMock = mock(Plc4XEndpoint.class, RETURNS_DEEP_STUBS);
when(endpointMock.getEndpointUri()).thenReturn("plc4x:mock:10.10.10.1/1/1");
- PlcConnection mockConnection = mock(PlcConnection.class, RETURNS_DEEP_STUBS);
+ when(endpointMock.canWrite()).thenReturn(true);
- when(mockConnection.getMetadata().canRead()).thenReturn(true);
- when(mockConnection.getMetadata().canWrite()).thenReturn(true);
- when(mockConnection.writeRequestBuilder())
- .thenReturn(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS));
-
- when(endpointMock.getConnection()).thenReturn(mockConnection);
sut = new Plc4XProducer(endpointMock);
- sut.doStart();
testExchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
Map<String, Map<String, Object>> tags = new HashMap();
tags.put("test1", Collections.singletonMap("testAddress1", 0));
@@ -89,21 +78,7 @@ public class Plc4XProducerTest {
@Test
public void doStopOpenRequest() throws Exception {
- Field openRequests = sut.getClass().getDeclaredField("openRequests");
- openRequests.setAccessible(true);
- AtomicInteger atomicInteger = (AtomicInteger) openRequests.get(sut);
- atomicInteger.incrementAndGet();
- sut.doStop();
- }
-
- @Test
- public void doStopBadConnection() throws Exception {
- Field openRequests = sut.getClass().getDeclaredField("plcConnection");
- openRequests.setAccessible(true);
- PlcConnection plcConnectionMock = mock(PlcConnection.class);
- doThrow(new RuntimeException("oh noes")).when(plcConnectionMock).close();
- openRequests.set(sut, plcConnectionMock);
+ sut.openRequests.incrementAndGet();
sut.doStop();
}
-
}