You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/22 17:25:32 UTC

[camel] branch master updated: CAMEL-16025: [camel-mongodb] ChangeStreams: Exception not gracefully handled

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a6df30  CAMEL-16025: [camel-mongodb] ChangeStreams: Exception not gracefully handled
2a6df30 is described below

commit 2a6df30e1f34d8fc4f5b96c535df022f494f3e69
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 22 18:25:00 2021 +0100

    CAMEL-16025: [camel-mongodb] ChangeStreams: Exception not gracefully handled
---
 .../mongodb/MongoAbstractConsumerThread.java       | 43 ++++++++++++++--------
 .../mongodb/MongoDbChangeStreamsThread.java        |  4 +-
 2 files changed, 30 insertions(+), 17 deletions(-)

diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java
index b129351..1209d9a 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java
@@ -64,26 +64,39 @@ abstract class MongoAbstractConsumerThread implements Runnable {
     @Override
     public void run() {
         stoppedLatch = new CountDownLatch(1);
-        while (keepRunning) {
-            doRun();
-            // regenerate the cursor, if reading failed for some reason
-            if (keepRunning) {
-                cursor.close();
-                regeneratingCursor();
-
-                if (cursorRegenerationDelayEnabled) {
-                    try {
-                        Thread.sleep(cursorRegenerationDelay);
-                    } catch (InterruptedException ignored) {
+        try {
+            while (keepRunning) {
+                try {
+                    doRun();
+                } catch (Exception e) {
+                    if (keepRunning) {
+                        log.warn("Exception from consuming from MongoDB caused by " + e.getMessage()
+                                 + ". Will try again on next poll.");
+                    } else {
+                        log.warn("Exception from consuming from MongoDB caused by " + e.getMessage()
+                                 + ". ConsumerThread will be stopped.",
+                                e);
                     }
                 }
+                // regenerate the cursor, if reading failed for some reason
+                if (keepRunning) {
+                    cursor.close();
+                    regeneratingCursor();
+
+                    if (cursorRegenerationDelayEnabled) {
+                        try {
+                            Thread.sleep(cursorRegenerationDelay);
+                        } catch (InterruptedException ignored) {
+                        }
+                    }
 
-                cursor = initializeCursor();
+                    cursor = initializeCursor();
+                }
             }
+        } finally {
+            stopped = true;
+            stoppedLatch.countDown();
         }
-
-        stopped = true;
-        stoppedLatch.countDown();
     }
 
     protected void stop() throws Exception {
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
index 8757793..7184797 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
@@ -96,9 +96,9 @@ class MongoDbChangeStreamsThread extends MongoAbstractConsumerThread {
             // it throws exception when cursor is closed in another thread
             // there is no way to stop hasNext() before closing cursor
             if (keepRunning) {
-                throw e;
-            } else {
                 log.debug("Exception from MongoDB, will regenerate cursor.", e);
+            } else {
+                throw e;
             }
         }
     }