You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2022/05/22 19:33:07 UTC

[plc4x] branch develop updated: fix(plc4j/scraper): Handle broken connections when an exception occurs when calling getConnection()

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new ec16e7426c fix(plc4j/scraper): Handle broken connections when an exception occurs when calling getConnection()
ec16e7426c is described below

commit ec16e7426c3f447ef4a8a038206f975eae92d4b9
Author: hutcheb <be...@gmail.com>
AuthorDate: Sun May 22 19:30:55 2022 +0000

    fix(plc4j/scraper): Handle broken connections when an exception occurs when calling getConnection()
---
 .../src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java       | 4 ++--
 .../plc4x/java/utils/connectionpool2/CachedDriverManager.java       | 6 +++++-
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 9fa01fdc72..700fab1420 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -154,8 +154,7 @@ public class Plc4xSourceTask extends SourceTask {
 
         try {
             PlcDriverManager manager = new PooledDriverManager();
-            PlcDriverManager plcDriverManager = new CachedDriverManager(plc4xConnectionString,() -> manager.getConnection(plc4xConnectionString));
-            TriggerCollector triggerCollector = new TriggerCollectorImpl(plcDriverManager);
+            TriggerCollector triggerCollector = new TriggerCollectorImpl(manager);
             scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> {
                 try {
                     Long timestamp = System.currentTimeMillis();
@@ -235,6 +234,7 @@ public class Plc4xSourceTask extends SourceTask {
             triggerCollector.start();
         } catch (ScraperException e) {
             log.error("Error starting the scraper", e);
+
         }
     }
 
diff --git a/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/connectionpool2/CachedDriverManager.java b/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/connectionpool2/CachedDriverManager.java
index 134495fb94..c76b1c8783 100644
--- a/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/connectionpool2/CachedDriverManager.java
+++ b/plc4j/tools/connection-cache/src/main/java/org/apache/plc4x/java/utils/connectionpool2/CachedDriverManager.java
@@ -178,8 +178,10 @@ public class CachedDriverManager extends PlcDriverManager implements CachedDrive
         try {
             return future.get(timeoutMillis, TimeUnit.MILLISECONDS);
         } catch (ExecutionException | TimeoutException e) {
+            handleBrokenConnection();
             throw new PlcConnectionException("No Connection Available, timed out while waiting in queue.", e);
         } catch (InterruptedException e) {
+            handleBrokenConnection();
             Thread.currentThread().interrupt();
             throw new PlcConnectionException("No Connection Available, interrupted while waiting in queue.", e);
         } finally {
@@ -283,7 +285,9 @@ public class CachedDriverManager extends PlcDriverManager implements CachedDrive
     }
 
     private void cancelWatchdog() {
-        borrowWatchdog.cancel(false);
+        if (borrowWatchdog != null ) {
+            borrowWatchdog.cancel(false);
+        }
     }
 
     @Override