You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/06/13 07:23:22 UTC

[camel] branch camel-2.23.x updated: "CAMEL-13407:CouchDbChangesetTracker fails silently on network error and does not recover"

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-2.23.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.23.x by this push:
     new 716ea1b  "CAMEL-13407:CouchDbChangesetTracker fails silently on network error and does not recover"
716ea1b is described below

commit 716ea1ba3f71db69fec33213e6dffaf1d2004f75
Author: Ramu <kk...@redhat.com>
AuthorDate: Mon Jun 3 19:33:25 2019 +0530

    "CAMEL-13407:CouchDbChangesetTracker fails silently on network error and does not recover"
---
 .../component/couchdb/CouchDbChangesetTracker.java | 98 +++++++++++++++++-----
 1 file changed, 77 insertions(+), 21 deletions(-)

diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
index 6857aa7..0090c27 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
@@ -20,6 +20,7 @@ import com.google.gson.JsonObject;
 import org.apache.camel.Exchange;
 import org.lightcouch.Changes;
 import org.lightcouch.ChangesResult;
+import org.lightcouch.CouchDbException;
 import org.lightcouch.CouchDbInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
 public class CouchDbChangesetTracker implements Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(CouchDbChangesetTracker.class);
+    private static final int MAX_DB_ERROR_REPEATS = 8;
 
     private volatile boolean stopped;
     private final CouchDbClientWrapper couchClient;
@@ -38,46 +40,100 @@ public class CouchDbChangesetTracker implements Runnable {
         this.endpoint = endpoint;
         this.consumer = consumer;
         this.couchClient = couchClient;
-        initChanges();
+        initChanges(null);
     }
 
-    void initChanges() {
-        String since = endpoint.getSince();
-        if (since == null) {
+    private void initChanges(final String sequence) {
+        String since = sequence;
+        if (null == since) {
             CouchDbInfo dbInfo = couchClient.context().info();
             since = dbInfo.getUpdateSeq(); // get latest update seq
-            LOG.debug("Last sequence [{}]", since);
         }
+        LOG.debug("Last sequence [{}]", since);
         changes = couchClient.changes().style(endpoint.getStyle()).includeDocs(true)
                 .since(since).heartBeat(endpoint.getHeartbeat()).continuousChanges();
     }
 
-    @Override
     public void run() {
-        while (changes.hasNext()) { // blocks until a feed is received
-            ChangesResult.Row feed = changes.next();
-            if (feed.isDeleted() && !endpoint.isDeletes()) {
-                continue;
-            }
-            if (!feed.isDeleted() && !endpoint.isUpdates()) {
-                continue;
+
+        String lastSequence = null;
+
+        try {
+            while (!stopped) {
+
+                try {
+                    while (changes.hasNext()) { // blocks until a feed is received
+                        ChangesResult.Row feed = changes.next();
+                        if (feed.isDeleted() && !endpoint.isDeletes()) {
+                            continue;
+                        }
+                        if (!feed.isDeleted() && !endpoint.isUpdates()) {
+                            continue;
+                        }
+
+                        lastSequence = feed.getSeq();
+                        JsonObject doc = feed.getDoc();
+
+                        Exchange exchange = endpoint.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted());
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Created exchange [exchange={}, _id={}, seq={}", new Object[]{exchange, feed.getId(), lastSequence});
+                        }
+
+                        try {
+                            consumer.getProcessor().process(exchange);
+                        } catch (Exception e) {
+                            consumer.getExceptionHandler().handleException("Error processing exchange.", exchange, e);
+                        }
+                    }
+
+                    stopped = true;
+
+                } catch (CouchDbException e) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("CouchDb Exception encountered waiting for changes!  Attempting to recover...", e);
+                    }
+                    if (!waitForStability(lastSequence)) { 
+                        throw e;
+                    }
+                }
             }
+        } catch (Exception e) {
+            LOG.error("Unexpected error causing CouchDb change tracker to exit!", e);
+        }
+    }
+
+    private boolean waitForStability(final String lastSequence) {
 
-            String seq = feed.getSeq();
-            JsonObject doc = feed.getDoc();
+        boolean problems = true;
+        int repeatDbErrorCount = 0;
 
-            Exchange exchange = endpoint.createExchange(seq, feed.getId(), doc, feed.isDeleted());
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Created exchange [exchange={}, _id={}, seq={}", exchange, feed.getId(), seq);
+        while (problems) {
+            if (++repeatDbErrorCount > MAX_DB_ERROR_REPEATS) {
+                LOG.error("CouchDb change set listener fatal error!  Retry attempts exceeded, listener must exit.");
+                return false;
             }
 
             try {
-                consumer.getProcessor().process(exchange);
+                Thread.sleep((int) ((Math.random() * 2000) + 5000)); // <2000ms,5000ms)
+            } catch (InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("CouchDb change set listener interrupted waiting for stability!!", e);
+                }
+            }
+            try {
+                // Fail fast operation
+                couchClient.context().serverVersion();
+                // reset change listener
+                initChanges(lastSequence);
+                problems = false;
+
             } catch (Exception e) {
-                consumer.getExceptionHandler().handleException("Error processing exchange.", exchange, e);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to get CouchDb server version and/or reset change listener!  Attempt: " + repeatDbErrorCount, e);
+                }
             }
         }
-        stopped = true;
+        return true;
     }
 
     public void stop() {