You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/22 17:25:32 UTC
[camel] branch master updated: CAMEL-16025: [camel-mongodb]
ChangeStreams: Exception not gracefully handled
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 2a6df30 CAMEL-16025: [camel-mongodb] ChangeStreams: Exception not gracefully handled
2a6df30 is described below
commit 2a6df30e1f34d8fc4f5b96c535df022f494f3e69
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 22 18:25:00 2021 +0100
CAMEL-16025: [camel-mongodb] ChangeStreams: Exception not gracefully handled
---
.../mongodb/MongoAbstractConsumerThread.java | 43 ++++++++++++++--------
.../mongodb/MongoDbChangeStreamsThread.java | 4 +-
2 files changed, 30 insertions(+), 17 deletions(-)
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java
index b129351..1209d9a 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java
@@ -64,26 +64,39 @@ abstract class MongoAbstractConsumerThread implements Runnable {
@Override
public void run() {
stoppedLatch = new CountDownLatch(1);
- while (keepRunning) {
- doRun();
- // regenerate the cursor, if reading failed for some reason
- if (keepRunning) {
- cursor.close();
- regeneratingCursor();
-
- if (cursorRegenerationDelayEnabled) {
- try {
- Thread.sleep(cursorRegenerationDelay);
- } catch (InterruptedException ignored) {
+ try {
+ while (keepRunning) {
+ try {
+ doRun();
+ } catch (Exception e) {
+ if (keepRunning) {
+ log.warn("Exception from consuming from MongoDB caused by " + e.getMessage()
+ + ". Will try again on next poll.");
+ } else {
+ log.warn("Exception from consuming from MongoDB caused by " + e.getMessage()
+ + ". ConsumerThread will be stopped.",
+ e);
}
}
+ // regenerate the cursor, if reading failed for some reason
+ if (keepRunning) {
+ cursor.close();
+ regeneratingCursor();
+
+ if (cursorRegenerationDelayEnabled) {
+ try {
+ Thread.sleep(cursorRegenerationDelay);
+ } catch (InterruptedException ignored) {
+ }
+ }
- cursor = initializeCursor();
+ cursor = initializeCursor();
+ }
}
+ } finally {
+ stopped = true;
+ stoppedLatch.countDown();
}
-
- stopped = true;
- stoppedLatch.countDown();
}
protected void stop() throws Exception {
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
index 8757793..7184797 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
@@ -96,9 +96,9 @@ class MongoDbChangeStreamsThread extends MongoAbstractConsumerThread {
// it throws exception when cursor is closed in another thread
// there is no way to stop hasNext() before closing cursor
if (keepRunning) {
- throw e;
- } else {
log.debug("Exception from MongoDB, will regenerate cursor.", e);
+ } else {
+ throw e;
}
}
}