You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/01/03 15:37:17 UTC

[camel] branch main updated: camel-plc4x: Connection now established in doStart() of Producer/Consumer instead of constructors (#8967)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ea089b2fe3 camel-plc4x: Connection now established in doStart() of Producer/Consumer instead of constructors (#8967)
3ea089b2fe3 is described below

commit 3ea089b2fe365b3f94bcf9a93d28de8e94f204e0
Author: Stein Overtoom <st...@overtoom.email>
AuthorDate: Tue Jan 3 16:37:07 2023 +0100

    camel-plc4x: Connection now established in doStart() of Producer/Consumer instead of constructors (#8967)
    
    Co-authored-by: Stein Overtoom <st...@triopsys.nl>
---
 .../camel/component/plc4x/Plc4XConsumer.java       | 13 ++++++---
 .../camel/component/plc4x/Plc4XEndpoint.java       | 33 +++++++---------------
 .../camel/component/plc4x/Plc4XProducer.java       | 20 +++++++++----
 .../camel/component/plc4x/Plc4XEndpointTest.java   |  2 +-
 .../camel/component/plc4x/Plc4XProducerTest.java   |  1 +
 5 files changed, 36 insertions(+), 33 deletions(-)

diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
index bd82e76e861..c7b74c8aefc 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XConsumer.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 public class Plc4XConsumer extends DefaultConsumer {
     private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XConsumer.class);
 
-    private final PlcConnection plcConnection;
+    private PlcConnection plcConnection;
     private final Map<String, Object> tags;
     private final String trigger;
     private final Plc4XEndpoint plc4XEndpoint;
@@ -56,7 +56,6 @@ public class Plc4XConsumer extends DefaultConsumer {
     public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.plc4XEndpoint = endpoint;
-        this.plcConnection = endpoint.getConnection();
         this.tags = endpoint.getTags();
         this.trigger = endpoint.getTrigger();
     }
@@ -72,7 +71,12 @@ public class Plc4XConsumer extends DefaultConsumer {
     }
 
     @Override
-    protected void doStart() throws ScraperException {
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.plcConnection = plc4XEndpoint.getConnection();
+        if (!plcConnection.isConnected()) {
+            plc4XEndpoint.reconnect();
+        }
         if (trigger == null) {
             startUnTriggered();
         } else {
@@ -147,11 +151,12 @@ public class Plc4XConsumer extends DefaultConsumer {
     }
 
     @Override
-    protected void doStop() {
+    protected void doStop() throws Exception {
         // First stop the polling process
         if (future != null) {
             future.cancel(true);
         }
+        super.doStop();
     }
 
 }
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
index 4ad9fa58694..b714c93a853 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XEndpoint.java
@@ -32,7 +32,6 @@ import org.apache.camel.support.DefaultEndpoint;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 
 /**
@@ -60,13 +59,10 @@ public class Plc4XEndpoint extends DefaultEndpoint {
     private PlcConnection connection;
     private String uri;
 
-    public Plc4XEndpoint(String endpointUri, Component component) throws PlcConnectionException {
+    public Plc4XEndpoint(String endpointUri, Component component) {
         super(endpointUri, component);
         this.plcDriverManager = new PlcDriverManager();
-        //Here we establish the connection in the endpoint, as it is created once during the context
-        // to avoid disconnecting and reconnecting for every request
         this.uri = endpointUri.replaceFirst("plc4x:/?/?", "");
-        this.connection = plcDriverManager.getConnection(this.uri);
     }
 
     public int getPeriod() {
@@ -88,35 +84,26 @@ public class Plc4XEndpoint extends DefaultEndpoint {
     public void setTrigger(String trigger) {
         this.trigger = trigger;
         plcDriverManager = new PooledPlcDriverManager();
-        String plc4xURI = uri.replaceFirst("plc4x:/?/?", "");
-        // TODO: is this mutation really intentional
-        uri = plc4xURI;
-        try {
-            connection = plcDriverManager.getConnection(plc4xURI);
-        } catch (PlcConnectionException e) {
-            throw new PlcRuntimeException(e);
-        }
     }
 
-    public PlcConnection getConnection() {
+    public PlcConnection getConnection() throws PlcConnectionException {
+        if (this.connection == null) {
+            this.connection = plcDriverManager.getConnection(this.uri);
+        }
         return connection;
     }
 
+    public void reconnect() throws PlcConnectionException {
+        connection.connect();
+    }
+
     @Override
-    public Producer createProducer() throws Exception {
-        //Checking if connection is still up and reconnecting if not
-        if (!connection.isConnected()) {
-            connection = plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
-        }
+    public Producer createProducer() {
         return new Plc4XProducer(this);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        //Checking if connection is still up and reconnecting if not
-        if (!connection.isConnected()) {
-            connection = plcDriverManager.getConnection(uri.replaceFirst("plc4x:/?/?", ""));
-        }
         Plc4XConsumer consumer = new Plc4XConsumer(this, processor);
         configureConsumer(consumer);
         return consumer;
diff --git a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
index 2e149ab042e..9db0c6db4d7 100644
--- a/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
+++ b/components/camel-plc4x/src/main/java/org/apache/camel/component/plc4x/Plc4XProducer.java
@@ -36,15 +36,24 @@ public class Plc4XProducer extends DefaultAsyncProducer {
     private final Logger log = LoggerFactory.getLogger(Plc4XProducer.class);
     private PlcConnection plcConnection;
     private AtomicInteger openRequests;
+    private final Plc4XEndpoint plc4XEndpoint;
 
-    public Plc4XProducer(Plc4XEndpoint endpoint) throws PlcException {
+    public Plc4XProducer(Plc4XEndpoint endpoint) {
         super(endpoint);
-        String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
-        this.plcConnection = endpoint.getConnection();
+        plc4XEndpoint = endpoint;
+        openRequests = new AtomicInteger();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.plcConnection = plc4XEndpoint.getConnection();
+        if (!plcConnection.isConnected()) {
+            plc4XEndpoint.reconnect();
+        }
         if (!plcConnection.getMetadata().canWrite()) {
-            throw new PlcException("This connection (" + plc4xURI + ") doesn't support writing.");
+            throw new PlcException("This connection (" + plc4XEndpoint.getUri() + ") doesn't support writing.");
         }
-        openRequests = new AtomicInteger();
     }
 
     @Override
@@ -99,6 +108,7 @@ public class Plc4XProducer extends DefaultAsyncProducer {
 
     @Override
     protected void doStop() throws Exception {
+        super.doStop();
         int openRequestsAtStop = openRequests.get();
         log.debug("Stopping with {} open requests", openRequestsAtStop);
         if (openRequestsAtStop > 0) {
diff --git a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
index 388d14a5b1b..c72002835a5 100644
--- a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
+++ b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XEndpointTest.java
@@ -40,7 +40,7 @@ public class Plc4XEndpointTest {
 
     // TODO: figure out what this is
     @Test
-    public void createProducer() throws Exception {
+    public void createProducer() {
         assertThat(sut.createProducer(), notNullValue());
     }
 
diff --git a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
index 2dcbc6c1cda..35250d6944d 100644
--- a/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
+++ b/components/camel-plc4x/src/test/java/org/apache/camel/component/plc4x/Plc4XProducerTest.java
@@ -50,6 +50,7 @@ public class Plc4XProducerTest {
 
         when(endpointMock.getConnection()).thenReturn(mockConnection);
         sut = new Plc4XProducer(endpointMock);
+        sut.doStart();
         testExchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
         Map<String, Map<String, Object>> tags = new HashMap();
         tags.put("test1", Collections.singletonMap("testAddress1", 0));