You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2020/12/16 14:11:50 UTC

[camel] 02/02: CAMEL-12871: disconnect on handshake failure

This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4e4a063f41dfcd7c78e37b676bd626b9120632db
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 14:43:13 2020 +0100

    CAMEL-12871: disconnect on handshake failure
    
    If we can't connect and perform the handshake, disconnecting will
    trigger client restart with back-off. Also when restarting as the signal
    to restart can occur on multiple threads we need to guard against
    restart happening in parallel.
---
 .../internal/streaming/SubscriptionHelper.java     | 132 ++++++++++++---------
 1 file changed, 77 insertions(+), 55 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 25e363b..c97fb02 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
@@ -93,6 +94,7 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private volatile boolean reconnecting;
     private final AtomicLong restartBackoff;
+    private final AtomicBoolean restarting = new AtomicBoolean();
 
     public SubscriptionHelper(final SalesforceComponent component) throws SalesforceException {
         this.component = component;
@@ -173,6 +175,7 @@ public class SubscriptionHelper extends ServiceSupport {
                         connectError = (String) message.get(ERROR_FIELD);
                         connectException = getFailure(message);
 
+                        client.disconnect();
                     } else if (reconnecting) {
 
                         reconnecting = false;
@@ -206,6 +209,10 @@ public class SubscriptionHelper extends ServiceSupport {
         }
         client.getChannel(META_DISCONNECT).addListener(disconnectListener);
 
+        connect();
+    }
+
+    private void connect() throws CamelException {
         // connect to Salesforce cometd endpoint
         client.handshake();
 
@@ -229,80 +236,95 @@ public class SubscriptionHelper extends ServiceSupport {
 
     // launch an async task to restart
     private void restartClient() {
+        if (!restarting.compareAndSet(false, true)) {
+            return;
+        }
 
         // launch a new restart command
         final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
         httpClient.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
+                try {
+                    performClientRestart();
+                } finally {
+                    restarting.set(false);
+                }
+            }
+        });
+    }
 
-                LOG.info("Restarting on unexpected disconnect from Salesforce...");
-                boolean abort = false;
+    private void performClientRestart() {
+        if (isStoppingOrStopped()) {
+            return;
+        }
 
-                // wait for disconnect
-                LOG.debug("Waiting to disconnect...");
-                while (!client.isDisconnected()) {
-                    try {
-                        Thread.sleep(DISCONNECT_INTERVAL);
-                    } catch (InterruptedException e) {
-                        LOG.error("Aborting restart on interrupt!");
-                        abort = true;
-                    }
-                }
+        LOG.info("Restarting on unexpected disconnect from Salesforce...");
+        boolean abort = false;
+
+        // wait for disconnect
+        LOG.debug("Waiting to disconnect...");
+        while (!abort && !client.isDisconnected()) {
+            try {
+                Thread.sleep(DISCONNECT_INTERVAL);
+            } catch (InterruptedException e) {
+                LOG.error("Aborting restart on interrupt!");
+                abort = true;
+            }
 
-                if (!abort) {
+            abort = isStoppingOrStopped();
+        }
 
-                    // update restart attempt backoff
-                    final long backoff = restartBackoff.getAndAdd(backoffIncrement);
-                    if (backoff > maxBackoff) {
-                        LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
-                        abort = true;
-                    } else {
+        if (!abort) {
 
-                        // pause before restart attempt
-                        LOG.debug("Pausing for {} msecs before restart attempt", backoff);
-                        try {
-                            Thread.sleep(backoff);
-                        } catch (InterruptedException e) {
-                            LOG.error("Aborting restart on interrupt!");
-                            abort = true;
-                        }
-                    }
+            // update restart attempt backoff
+            final long backoff = restartBackoff.getAndAdd(backoffIncrement);
+            if (backoff > maxBackoff) {
+                LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
+                abort = true;
+            } else {
 
-                    if (!abort) {
-                        Exception lastError = new SalesforceException("Unknown error", null);
-                        try {
-                            // reset client
-                            doStop();
+                // pause before restart attempt
+                LOG.debug("Pausing for {} msecs before restart attempt", backoff);
+                try {
+                    Thread.sleep(backoff);
+                } catch (InterruptedException e) {
+                    LOG.error("Aborting restart on interrupt!");
+                    abort = true;
+                }
+            }
 
-                            // register listeners and restart
-                            doStart();
+            if (!abort) {
+                Exception lastError = new SalesforceException("Unknown error", null);
+                try {
+                    // reset client
+                    doStop();
 
-                        } catch (Exception e) {
-                            LOG.error("Error restarting: " + e.getMessage(), e);
-                            lastError = e;
-                        }
+                    // register listeners and restart
+                    doStart();
 
-                        if (client != null && client.isHandshook()) {
-                            LOG.info("Successfully restarted!");
-                            // reset backoff interval
-                            restartBackoff.set(client.getBackoffIncrement());
-                        } else {
-                            LOG.error("Failed to restart after pausing for {} msecs", backoff);
-                            if ((backoff + backoffIncrement) > maxBackoff) {
-                                // notify all consumers
-                                String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
-                                SalesforceException ex = new SalesforceException(abortMsg, lastError);
-                                for (SalesforceConsumer consumer : listenerMap.keySet()) {
-                                    consumer.handleException(abortMsg, ex);
-                                }
-                            }
+                } catch (Exception e) {
+                    LOG.error("Error restarting: " + e.getMessage(), e);
+                    lastError = e;
+                }
+
+                if (client != null && client.isHandshook()) {
+                    LOG.info("Successfully restarted!");
+                    // reset backoff interval
+                    restartBackoff.set(client.getBackoffIncrement());
+                } else {
+                    LOG.error("Failed to restart after pausing for {} msecs", backoff);
+                    if ((backoff + backoffIncrement) > maxBackoff) {
+                        // notify all consumers
+                        String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
+                        SalesforceException ex = new SalesforceException(abortMsg, lastError);
+                        for (SalesforceConsumer consumer : listenerMap.keySet()) {
+                            consumer.handleException(abortMsg, ex);
                         }
                     }
                 }
-
             }
-        });
+        }
     }
 
     @SuppressWarnings("unchecked")