You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/01/03 15:37:17 UTC
[camel] branch main updated: camel-plc4x: Connection now established in doStart() of Producer/Consumer instead of constructors (#8967)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3ea089b2fe3 camel-plc4x: Connection now established in doStart() of Producer/Consumer instead of constructors (#8967)
3ea089b2fe3 is described below
commit 3ea089b2fe365b3f94bcf9a93d28de8e94f204e0
Author: Stein Overtoom <st...@overtoom.email>
AuthorDate: Tue Jan 3 16:37:07 2023 +0100
camel-plc4x: Connection now established in doStart() of Producer/Consumer instead of constructors (#8967)
Co-authored-by: Stein Overtoom <st...@triopsys.nl>
---
.../camel/component/plc4x/Plc4XConsumer.java | 13 ++++++---
.../camel/component/plc4x/Plc4XEndpoint.java | 33 +++++++---------------
.../camel/component/plc4x/Plc4XProducer.java | 20 +++++++++----
.../camel/component/plc4x/Plc4XEndpointTest.java | 2 +-
.../camel/component/plc4x/Plc4XProducerTest.java | 1 +
5 files changed, 36 insertions(+), 33 deletions(-)
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
index bd82e76e861..c7b74c8aefc 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
public class Plc4XConsumer extends DefaultConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XConsumer.class);
- private final PlcConnection plcConnection;
+ private PlcConnection plcConnection;
private final Map<String, Object> tags;
private final String trigger;
private final Plc4XEndpoint plc4XEndpoint;
@@ -56,7 +56,6 @@ public class Plc4XConsumer extends DefaultConsumer {
public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.plc4XEndpoint = endpoint;
- this.plcConnection = endpoint.getConnection();
this.tags = endpoint.getTags();
this.trigger = endpoint.getTrigger();
}
@@ -72,7 +71,12 @@ public class Plc4XConsumer extends DefaultConsumer {
}
@Override
- protected void doStart() throws ScraperException {
+ protected void doStart() throws Exception {
+ super.doStart();
+ this.plcConnection = plc4XEndpoint.getConnection();
+ if (!plcConnection.isConnected()) {
+ plc4XEndpoint.reconnect();
+ }
if (trigger == null) {
startUnTriggered();
} else {
@@ -147,11 +151,12 @@ public class Plc4XConsumer extends DefaultConsumer {
}
@Override
- protected void doStop() {
+ protected void doStop() throws Exception {
// First stop the polling process
if (future != null) {
future.cancel(true);
}
+ super.doStop();
}
}
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
index 4ad9fa58694..b714c93a853 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
@@ -32,7 +32,6 @@ import org.apache.camel.support.DefaultEndpoint;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
/**
@@ -60,13 +59,10 @@ public class Plc4XEndpoint extends DefaultEndpoint {
private PlcConnection connection;
private String uri;
- public Plc4XEndpoint(String endpointUri, Component component) throws PlcConnectionException {
+ public Plc4XEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
this.plcDriverManager = new PlcDriverManager();
- //Here we establish the connection in the endpoint, as it is created once during the context
- // to avoid disconnecting and reconnecting for every request
this.uri = endpointUri.replaceFirst("plc4x:/?/?", "");
- this.connection = plcDriverManager.getConnection(this.uri);
}
public int getPeriod() {
@@ -88,35 +84,26 @@ public class Plc4XEndpoint extends DefaultEndpoint {
public void setTrigger(String trigger) {
this.trigger = trigger;
plcDriverManager = new PooledPlcDriverManager();
- String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
- // TODO: is this mutation really intentional
- uri = plc4xURI;
- try {
- connection = plcDriverManager.getConnection(plc4xURI);
- } catch (PlcConnectionException e) {
- throw new PlcRuntimeException(e);
- }
}
- public PlcConnection getConnection() {
+ public PlcConnection getConnection() throws PlcConnectionException {
+ if (this.connection == null) {
+ this.connection = plcDriverManager.getConnection(this.uri);
+ }
return connection;
}
+ public void reconnect() throws PlcConnectionException {
+ connection.connect();
+ }
+
@Override
- public Producer createProducer() throws Exception {
- //Checking if connection is still up and reconnecting if not
- if (!connection.isConnected()) {
- connection = plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
- }
+ public Producer createProducer() {
return new Plc4XProducer(this);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- //Checking if connection is still up and reconnecting if not
- if (!connection.isConnected()) {
- connection = plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
- }
Plc4XConsumer consumer = new Plc4XConsumer(this, processor);
configureConsumer(consumer);
return consumer;
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
index 2e149ab042e..9db0c6db4d7 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
@@ -36,15 +36,24 @@ public class Plc4XProducer extends DefaultAsyncProducer {
private final Logger log = LoggerFactory.getLogger(Plc4XProducer.class);
private PlcConnection plcConnection;
private AtomicInteger openRequests;
+ private final Plc4XEndpoint plc4XEndpoint;
- public Plc4XProducer(Plc4XEndpoint endpoint) throws PlcException {
+ public Plc4XProducer(Plc4XEndpoint endpoint) {
super(endpoint);
- String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
- this.plcConnection = endpoint.getConnection();
+ plc4XEndpoint = endpoint;
+ openRequests = new AtomicInteger();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ this.plcConnection = plc4XEndpoint.getConnection();
+ if (!plcConnection.isConnected()) {
+ plc4XEndpoint.reconnect();
+ }
if (!plcConnection.getMetadata().canWrite()) {
- throw new PlcException("This connection (" + plc4xURI + ") doesn't support writing.");
+ throw new PlcException("This connection (" + plc4XEndpoint.getUri() + ") doesn't support writing.");
}
- openRequests = new AtomicInteger();
}
@Override
@@ -99,6 +108,7 @@ public class Plc4XProducer extends DefaultAsyncProducer {
@Override
protected void doStop() throws Exception {
+ super.doStop();
int openRequestsAtStop = openRequests.get();
log.debug("Stopping with {} open requests", openRequestsAtStop);
if (openRequestsAtStop > 0) {
diff --git a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
index 388d14a5b1b..c72002835a5 100644
--- a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
+++ b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
@@ -40,7 +40,7 @@ public class Plc4XEndpointTest {
// TODO: figure out what this is
@Test
- public void createProducer() throws Exception {
+ public void createProducer() {
assertThat(sut.createProducer(), notNullValue());
}
diff --git a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
index 2dcbc6c1cda..35250d6944d 100644
--- a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
+++ b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
@@ -50,6 +50,7 @@ public class Plc4XProducerTest {
when(endpointMock.getConnection()).thenReturn(mockConnection);
sut = new Plc4XProducer(endpointMock);
+ sut.doStart();
testExchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
Map<String, Map<String, Object>> tags = new HashMap();
tags.put("test1", Collections.singletonMap("testAddress1", 0));