You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2020/12/16 14:11:50 UTC
[camel] 02/02: CAMEL-12871: disconnect on handshake failure
This is an automated email from the ASF dual-hosted git repository.
zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4e4a063f41dfcd7c78e37b676bd626b9120632db
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 14:43:13 2020 +0100
CAMEL-12871: disconnect on handshake failure
If we can't connect and perform the handshake, disconnecting will
trigger client restart with back-off. Also when restarting as the signal
to restart can occur on multiple threads we need to guard against
restart happening in parallel.
---
.../internal/streaming/SubscriptionHelper.java | 132 ++++++++++++---------
1 file changed, 77 insertions(+), 55 deletions(-)
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 25e363b..c97fb02 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -22,6 +22,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
@@ -93,6 +94,7 @@ public class SubscriptionHelper extends ServiceSupport {
private volatile boolean reconnecting;
private final AtomicLong restartBackoff;
+ private final AtomicBoolean restarting = new AtomicBoolean();
public SubscriptionHelper(final SalesforceComponent component) throws SalesforceException {
this.component = component;
@@ -173,6 +175,7 @@ public class SubscriptionHelper extends ServiceSupport {
connectError = (String) message.get(ERROR_FIELD);
connectException = getFailure(message);
+ client.disconnect();
} else if (reconnecting) {
reconnecting = false;
@@ -206,6 +209,10 @@ public class SubscriptionHelper extends ServiceSupport {
}
client.getChannel(META_DISCONNECT).addListener(disconnectListener);
+ connect();
+ }
+
+ private void connect() throws CamelException {
// connect to Salesforce cometd endpoint
client.handshake();
@@ -229,80 +236,95 @@ public class SubscriptionHelper extends ServiceSupport {
// launch an async task to restart
private void restartClient() {
+ if (!restarting.compareAndSet(false, true)) {
+ return;
+ }
// launch a new restart command
final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
httpClient.getExecutor().execute(new Runnable() {
@Override
public void run() {
+ try {
+ performClientRestart();
+ } finally {
+ restarting.set(false);
+ }
+ }
+ });
+ }
- LOG.info("Restarting on unexpected disconnect from Salesforce...");
- boolean abort = false;
+ private void performClientRestart() {
+ if (isStoppingOrStopped()) {
+ return;
+ }
- // wait for disconnect
- LOG.debug("Waiting to disconnect...");
- while (!client.isDisconnected()) {
- try {
- Thread.sleep(DISCONNECT_INTERVAL);
- } catch (InterruptedException e) {
- LOG.error("Aborting restart on interrupt!");
- abort = true;
- }
- }
+ LOG.info("Restarting on unexpected disconnect from Salesforce...");
+ boolean abort = false;
+
+ // wait for disconnect
+ LOG.debug("Waiting to disconnect...");
+ while (!abort && !client.isDisconnected()) {
+ try {
+ Thread.sleep(DISCONNECT_INTERVAL);
+ } catch (InterruptedException e) {
+ LOG.error("Aborting restart on interrupt!");
+ abort = true;
+ }
- if (!abort) {
+ abort = isStoppingOrStopped();
+ }
- // update restart attempt backoff
- final long backoff = restartBackoff.getAndAdd(backoffIncrement);
- if (backoff > maxBackoff) {
- LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
- abort = true;
- } else {
+ if (!abort) {
- // pause before restart attempt
- LOG.debug("Pausing for {} msecs before restart attempt", backoff);
- try {
- Thread.sleep(backoff);
- } catch (InterruptedException e) {
- LOG.error("Aborting restart on interrupt!");
- abort = true;
- }
- }
+ // update restart attempt backoff
+ final long backoff = restartBackoff.getAndAdd(backoffIncrement);
+ if (backoff > maxBackoff) {
+ LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
+ abort = true;
+ } else {
- if (!abort) {
- Exception lastError = new SalesforceException("Unknown error", null);
- try {
- // reset client
- doStop();
+ // pause before restart attempt
+ LOG.debug("Pausing for {} msecs before restart attempt", backoff);
+ try {
+ Thread.sleep(backoff);
+ } catch (InterruptedException e) {
+ LOG.error("Aborting restart on interrupt!");
+ abort = true;
+ }
+ }
- // register listeners and restart
- doStart();
+ if (!abort) {
+ Exception lastError = new SalesforceException("Unknown error", null);
+ try {
+ // reset client
+ doStop();
- } catch (Exception e) {
- LOG.error("Error restarting: " + e.getMessage(), e);
- lastError = e;
- }
+ // register listeners and restart
+ doStart();
- if (client != null && client.isHandshook()) {
- LOG.info("Successfully restarted!");
- // reset backoff interval
- restartBackoff.set(client.getBackoffIncrement());
- } else {
- LOG.error("Failed to restart after pausing for {} msecs", backoff);
- if ((backoff + backoffIncrement) > maxBackoff) {
- // notify all consumers
- String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
- SalesforceException ex = new SalesforceException(abortMsg, lastError);
- for (SalesforceConsumer consumer : listenerMap.keySet()) {
- consumer.handleException(abortMsg, ex);
- }
- }
+ } catch (Exception e) {
+ LOG.error("Error restarting: " + e.getMessage(), e);
+ lastError = e;
+ }
+
+ if (client != null && client.isHandshook()) {
+ LOG.info("Successfully restarted!");
+ // reset backoff interval
+ restartBackoff.set(client.getBackoffIncrement());
+ } else {
+ LOG.error("Failed to restart after pausing for {} msecs", backoff);
+ if ((backoff + backoffIncrement) > maxBackoff) {
+ // notify all consumers
+ String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
+ SalesforceException ex = new SalesforceException(abortMsg, lastError);
+ for (SalesforceConsumer consumer : listenerMap.keySet()) {
+ consumer.handleException(abortMsg, ex);
}
}
}
-
}
- });
+ }
}
@SuppressWarnings("unchecked")