You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/06/13 07:20:41 UTC
[camel] branch camel-2.x updated:
"CAMEL-13407:CouchDbChangesetTracker fails silently on network error and
does not recover"
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push:
new d48a7ea "CAMEL-13407:CouchDbChangesetTracker fails silently on network error and does not recover"
d48a7ea is described below
commit d48a7ea49aeb19a513c5013f9e1646fc1cd448f2
Author: Ramu <kk...@redhat.com>
AuthorDate: Mon Jun 3 19:33:25 2019 +0530
"CAMEL-13407:CouchDbChangesetTracker fails silently on network error and does not recover"
---
.../component/couchdb/CouchDbChangesetTracker.java | 98 +++++++++++++++++-----
1 file changed, 77 insertions(+), 21 deletions(-)
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
index 6857aa7..0090c27 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
@@ -20,6 +20,7 @@ import com.google.gson.JsonObject;
import org.apache.camel.Exchange;
import org.lightcouch.Changes;
import org.lightcouch.ChangesResult;
+import org.lightcouch.CouchDbException;
import org.lightcouch.CouchDbInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
public class CouchDbChangesetTracker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(CouchDbChangesetTracker.class);
+ private static final int MAX_DB_ERROR_REPEATS = 8;
private volatile boolean stopped;
private final CouchDbClientWrapper couchClient;
@@ -38,46 +40,100 @@ public class CouchDbChangesetTracker implements Runnable {
this.endpoint = endpoint;
this.consumer = consumer;
this.couchClient = couchClient;
- initChanges();
+ initChanges(null);
}
- void initChanges() {
- String since = endpoint.getSince();
- if (since == null) {
+ private void initChanges(final String sequence) {
+ String since = sequence;
+ if (null == since) {
CouchDbInfo dbInfo = couchClient.context().info();
since = dbInfo.getUpdateSeq(); // get latest update seq
- LOG.debug("Last sequence [{}]", since);
}
+ LOG.debug("Last sequence [{}]", since);
changes = couchClient.changes().style(endpoint.getStyle()).includeDocs(true)
.since(since).heartBeat(endpoint.getHeartbeat()).continuousChanges();
}
- @Override
public void run() {
- while (changes.hasNext()) { // blocks until a feed is received
- ChangesResult.Row feed = changes.next();
- if (feed.isDeleted() && !endpoint.isDeletes()) {
- continue;
- }
- if (!feed.isDeleted() && !endpoint.isUpdates()) {
- continue;
+
+ String lastSequence = null;
+
+ try {
+ while (!stopped) {
+
+ try {
+ while (changes.hasNext()) { // blocks until a feed is received
+ ChangesResult.Row feed = changes.next();
+ if (feed.isDeleted() && !endpoint.isDeletes()) {
+ continue;
+ }
+ if (!feed.isDeleted() && !endpoint.isUpdates()) {
+ continue;
+ }
+
+ lastSequence = feed.getSeq();
+ JsonObject doc = feed.getDoc();
+
+ Exchange exchange = endpoint.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Created exchange [exchange={}, _id={}, seq={}", new Object[]{exchange, feed.getId(), lastSequence});
+ }
+
+ try {
+ consumer.getProcessor().process(exchange);
+ } catch (Exception e) {
+ consumer.getExceptionHandler().handleException("Error processing exchange.", exchange, e);
+ }
+ }
+
+ stopped = true;
+
+ } catch (CouchDbException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CouchDb Exception encountered waiting for changes! Attempting to recover...", e);
+ }
+ if (!waitForStability(lastSequence)) {
+ throw e;
+ }
+ }
}
+ } catch (Exception e) {
+ LOG.error("Unexpected error causing CouchDb change tracker to exit!", e);
+ }
+ }
+
+ private boolean waitForStability(final String lastSequence) {
- String seq = feed.getSeq();
- JsonObject doc = feed.getDoc();
+ boolean problems = true;
+ int repeatDbErrorCount = 0;
- Exchange exchange = endpoint.createExchange(seq, feed.getId(), doc, feed.isDeleted());
- if (LOG.isTraceEnabled()) {
- LOG.trace("Created exchange [exchange={}, _id={}, seq={}", exchange, feed.getId(), seq);
+ while (problems) {
+ if (++repeatDbErrorCount > MAX_DB_ERROR_REPEATS) {
+ LOG.error("CouchDb change set listener fatal error! Retry attempts exceeded, listener must exit.");
+ return false;
}
try {
- consumer.getProcessor().process(exchange);
+ Thread.sleep((int) ((Math.random() * 2000) + 5000)); // <2000ms,5000ms)
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CouchDb change set listener interrupted waiting for stability!!", e);
+ }
+ }
+ try {
+ // Fail fast operation
+ couchClient.context().serverVersion();
+ // reset change listener
+ initChanges(lastSequence);
+ problems = false;
+
} catch (Exception e) {
- consumer.getExceptionHandler().handleException("Error processing exchange.", exchange, e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to get CouchDb server version and/or reset change listener! Attempt: " + repeatDbErrorCount, e);
+ }
}
}
- stopped = true;
+ return true;
}
public void stop() {