You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by je...@apache.org on 2021/11/13 17:49:05 UTC

[camel] branch main updated: camel-salesforce: Streaming API fixes and improvements

This is an automated email from the ASF dual-hosted git repository.

jeremyross pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a6315a  camel-salesforce: Streaming API fixes and improvements
7a6315a is described below

commit 7a6315a73569f92a234a948a11cf4fc5478701c0
Author: Jeremy Ross <je...@gmail.com>
AuthorDate: Sat Nov 13 10:27:15 2021 -0600

    camel-salesforce: Streaming API fixes and improvements
    
    * Fixes CAMEL-16370, CAMEL-15203, CAMEL-13170.
    
    * Fix: fails to reconnect due to NPE
    
    * Smarter, more reliable reconnect/handshake logic
    
    * Use Bayeux client's reconnect functionality instead of restarting the whole
      SubscriptionHelper unnecessarily.
    
    * SubscriptionHelper: Avoid using doStart()/doStop() for anything other than
      starting and stopping this service
    
    * Eliminate unnecessary churn, e.g. full restarts, recreating listeners, Bayeux
      client, etc.
    
    * When reconnecting after a lost connection, replayId state is retained instead
      of going back to the endpoint config, which would be stale and result in
      repeated messages. This also applies to replayID values specified in the
      initialReplayIdMap option.
    
    * Ensure listener callbacks always use a different thread in order to avoid
      deadlocks
    
    * Toned down some log levels
    
    * When the auth token expires, attempt to login (with backoff) indefinitely.
    
    * Don't always expect a response to unsubscribe, because if there are other
      listeners on the channel, the Bayeux client will not send an unsubscribe
      message to the server
---
 .../salesforce/internal/SalesforceSession.java     |   7 +-
 .../internal/streaming/SubscriptionHelper.java     | 358 +++++++++------------
 .../salesforce/StreamingApiIntegrationTest.java    |   7 +-
 .../SubscriptionHelperIntegrationTest.java         |  85 -----
 .../internal/streaming/SubscriptionHelperTest.java |   4 +-
 5 files changed, 168 insertions(+), 293 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
index a600ef1..a0a0c84 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
@@ -311,10 +311,9 @@ public class SalesforceSession extends ServiceSupport {
             final String reason = logoutResponse.getReason();
 
             if (statusCode == HttpStatus.OK_200) {
-                LOG.info("Logout successful");
+                LOG.debug("Logout successful");
             } else {
-                throw new SalesforceException(
-                        String.format("Logout error, code: [%s] reason: [%s]", statusCode, reason), statusCode);
+                LOG.debug("Failed to revoke OAuth token. This is expected if the token is invalid or already expired");
             }
 
         } catch (InterruptedException e) {
@@ -324,7 +323,7 @@ public class SalesforceSession extends ServiceSupport {
             final Throwable ex = e.getCause();
             throw new SalesforceException("Unexpected logout exception: " + ex.getMessage(), ex);
         } catch (TimeoutException e) {
-            throw new SalesforceException("Logout request TIMEOUT!", null);
+            throw new SalesforceException("Logout request TIMEOUT!", e);
         } finally {
             // reset session
             accessToken = null;
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index ff760f9..969abfb 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -21,7 +21,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
@@ -50,10 +49,8 @@ import org.slf4j.LoggerFactory;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.cometd.bayeux.Channel.META_CONNECT;
-import static org.cometd.bayeux.Channel.META_DISCONNECT;
 import static org.cometd.bayeux.Channel.META_HANDSHAKE;
 import static org.cometd.bayeux.Channel.META_SUBSCRIBE;
-import static org.cometd.bayeux.Channel.META_UNSUBSCRIBE;
 import static org.cometd.bayeux.Message.ERROR_FIELD;
 import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD;
 
@@ -64,7 +61,6 @@ public class SubscriptionHelper extends ServiceSupport {
     private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
 
     private static final int CONNECT_TIMEOUT = 110;
-    private static final int CHANNEL_TIMEOUT = 40;
 
     private static final String FAILURE_FIELD = "failure";
     private static final String EXCEPTION_FIELD = "exception";
@@ -72,11 +68,12 @@ public class SubscriptionHelper extends ServiceSupport {
     private static final String FAILURE_REASON_FIELD = "failureReason";
     private static final int DISCONNECT_INTERVAL = 5000;
     private static final String SERVER_TOO_BUSY_ERROR = "503::";
+    public static final String AUTHENTICATION_INVALID = "401::Authentication invalid";
 
     BayeuxClient client;
 
     private final SalesforceComponent component;
-    private final SalesforceSession session;
+    private SalesforceSession session;
     private final long timeout = 60 * 1000L;
 
     private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap;
@@ -85,7 +82,6 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private ClientSessionChannel.MessageListener handshakeListener;
     private ClientSessionChannel.MessageListener connectListener;
-    private ClientSessionChannel.MessageListener disconnectListener;
 
     private volatile String handshakeError;
     private volatile Exception handshakeException;
@@ -93,71 +89,61 @@ public class SubscriptionHelper extends ServiceSupport {
     private volatile Exception connectException;
 
     private volatile boolean reconnecting;
-    private final AtomicLong restartBackoff;
-    private final AtomicBoolean restarting = new AtomicBoolean();
+    private final AtomicLong handshakeBackoff;
+    private final AtomicBoolean handshaking = new AtomicBoolean();
+    private final AtomicBoolean loggingIn = new AtomicBoolean();
 
     public SubscriptionHelper(final SalesforceComponent component) {
         this.component = component;
-        this.session = component.getSession();
-
-        this.listenerMap = new ConcurrentHashMap<>();
-
-        restartBackoff = new AtomicLong();
+        listenerMap = new ConcurrentHashMap<>();
+        handshakeBackoff = new AtomicLong();
         backoffIncrement = component.getConfig().getBackoffIncrement();
         maxBackoff = component.getConfig().getMaxBackoff();
     }
 
     @Override
     protected void doStart() throws Exception {
+        session = component.getSession();
 
         // create CometD client
-        this.client = createClient(component);
+        client = createClient(component, session);
 
-        // reset all error conditions
-        handshakeError = null;
-        handshakeException = null;
-        connectError = null;
-        connectException = null;
+        initMessageListeners();
+        connect();
+    }
 
+    private void initMessageListeners() {
         // listener for handshake error or exception
         if (handshakeListener == null) {
             // first start
             handshakeListener = new ClientSessionChannel.MessageListener() {
                 public void onMessage(ClientSessionChannel channel, Message message) {
-                    LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message);
-
-                    if (!message.isSuccessful()) {
-                        LOG.warn("Handshake failure: {}", message);
-                        handshakeError = (String) message.get(ERROR_FIELD);
-                        handshakeException = getFailure(message);
+                    component.getHttpClient().getWorkerPool().execute(() -> {
+                        LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message);
 
-                        if (handshakeError != null) {
-                            // refresh oauth token, if it's a 401 error
-                            if (handshakeError.startsWith("401::")) {
-                                try {
-                                    LOG.info("Refreshing OAuth token...");
-                                    session.login(session.getAccessToken());
-                                    LOG.info("Refreshed OAuth token for re-handshake");
-                                } catch (SalesforceException e) {
-                                    LOG.warn("Error renewing OAuth token on 401 error: " + e.getMessage(), e);
-                                }
-                            }
-                            if (handshakeError.startsWith("403::")) {
-                                try {
-                                    LOG.info("Cleaning session (logout) from SalesforceSession before restarting client");
-                                    session.logout();
-                                } catch (SalesforceException e) {
-                                    LOG.warn("Error while cleaning session: " + e.getMessage(), e);
+                        if (!message.isSuccessful()) {
+                            LOG.warn("Handshake failure: {}", message);
+                            handshakeError = (String) message.get(ERROR_FIELD);
+                            handshakeException = getFailure(message);
+                            if (handshakeError != null) {
+                                if (handshakeError.startsWith("403::")) {
+                                    String failureReason = getFailureReason(message);
+                                    if (failureReason.equals(AUTHENTICATION_INVALID)) {
+                                        LOG.debug(
+                                                "attempting login due to handshake error: 403 -> 401::Authentication invalid");
+                                        attemptLoginUntilSuccessful();
+                                    }
                                 }
                             }
-                        }
 
-                        // restart if handshake fails for any reason
-                        restartClient();
+                            // failed, so keep trying
+                            LOG.debug("Handshake failed, so try again.");
+                            handshake();
 
-                    } else if (!listenerMap.isEmpty()) {
-                        reconnecting = true;
-                    }
+                        } else if (!listenerMap.isEmpty()) {
+                            reconnecting = true;
+                        }
+                    });
                 }
             };
         }
@@ -167,49 +153,45 @@ public class SubscriptionHelper extends ServiceSupport {
         if (connectListener == null) {
             connectListener = new ClientSessionChannel.MessageListener() {
                 public void onMessage(ClientSessionChannel channel, Message message) {
-                    LOG.debug("[CHANNEL:META_CONNECT]: {}", message);
+                    component.getHttpClient().getWorkerPool().execute(() -> {
+                        LOG.debug("[CHANNEL:META_CONNECT]: {}", message);
 
-                    if (!message.isSuccessful()) {
+                        if (!message.isSuccessful()) {
 
-                        LOG.warn("Connect failure: {}", message);
-                        connectError = (String) message.get(ERROR_FIELD);
-                        connectException = getFailure(message);
+                            LOG.warn("Connect failure: {}", message);
+                            connectError = (String) message.get(ERROR_FIELD);
+                            connectException = getFailure(message);
 
-                        client.disconnect();
-                    } else if (reconnecting) {
-
-                        reconnecting = false;
-
-                        LOG.debug("Refreshing subscriptions to {} channels on reconnect", listenerMap.size());
-                        // reconnected to Salesforce, subscribe to existing
-                        // channels
-                        final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> map = new HashMap<>();
-                        map.putAll(listenerMap);
-                        listenerMap.clear();
-                        for (Map.Entry<SalesforceConsumer, ClientSessionChannel.MessageListener> entry : map.entrySet()) {
-                            final SalesforceConsumer consumer = entry.getKey();
-                            final String topicName = consumer.getTopicName();
-                            subscribe(topicName, consumer);
+                            if (connectError != null && connectError.equals(AUTHENTICATION_INVALID)) {
+                                LOG.debug("connectError: " + connectError);
+                                LOG.debug("Attempting login...");
+                                attemptLoginUntilSuccessful();
+                            }
+                            // Server says don't retry to connect, so we'll handshake instead
+                            // Otherwise, Bayeux client automatically re-attempts connection
+                            if (message.getAdvice() != null &&
+                                    !message.getAdvice().get("reconnect").equals("retry")) {
+                                LOG.debug("Advice != retry, so handshaking");
+                                handshake();
+                            }
+                        } else if (reconnecting) {
+                            LOG.debug("Refreshing subscriptions to {} channels on reconnect", listenerMap.size());
+                            // reconnected to Salesforce, subscribe to existing
+                            // channels
+                            final Map<SalesforceConsumer, MessageListener> map = new HashMap<>(listenerMap);
+                            listenerMap.clear();
+                            for (Map.Entry<SalesforceConsumer, ClientSessionChannel.MessageListener> entry : map.entrySet()) {
+                                final SalesforceConsumer consumer = entry.getKey();
+                                final String topicName = consumer.getTopicName();
+                                subscribe(topicName, consumer);
+                            }
+                            reconnecting = false;
                         }
-
-                    }
+                    });
                 }
             };
         }
         client.getChannel(META_CONNECT).addListener(connectListener);
-
-        // handle fatal disconnects by reconnecting asynchronously
-        if (disconnectListener == null) {
-            disconnectListener = new ClientSessionChannel.MessageListener() {
-                @Override
-                public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
-                    restartClient();
-                }
-            };
-        }
-        client.getChannel(META_DISCONNECT).addListener(disconnectListener);
-
-        connect();
     }
 
     private void connect() throws CamelException {
@@ -234,32 +216,26 @@ public class SubscriptionHelper extends ServiceSupport {
         }
     }
 
-    // launch an async task to restart
-    private void restartClient() {
-        if (!restarting.compareAndSet(false, true)) {
+    private void handshake() {
+        LOG.debug("Begin handshake if not already in progress.");
+        if (!handshaking.compareAndSet(false, true)) {
             return;
         }
 
-        // launch a new restart command
-        final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
-        httpClient.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    performClientRestart();
-                } finally {
-                    restarting.set(false);
-                }
-            }
-        });
+        LOG.debug("Continuing with handshake.");
+        try {
+            doHandshake();
+        } finally {
+            handshaking.set(false);
+        }
     }
 
-    private void performClientRestart() {
+    private void doHandshake() {
         if (isStoppingOrStopped()) {
             return;
         }
 
-        LOG.info("Restarting on unexpected disconnect from Salesforce...");
+        LOG.info("Handshaking after unexpected disconnect from Salesforce...");
         boolean abort = false;
 
         // wait for disconnect
@@ -268,28 +244,28 @@ public class SubscriptionHelper extends ServiceSupport {
             try {
                 Thread.sleep(DISCONNECT_INTERVAL);
             } catch (InterruptedException e) {
-                LOG.error("Aborting restart on interrupt!");
+                LOG.error("Aborting handshake on interrupt!");
                 abort = true;
             }
 
-            abort = isStoppingOrStopped();
+            abort = abort || isStoppingOrStopped();
         }
 
         if (!abort) {
 
-            // update restart attempt backoff
-            final long backoff = restartBackoff.getAndAdd(backoffIncrement);
+            // update handshake attempt backoff
+            final long backoff = handshakeBackoff.getAndAdd(backoffIncrement);
             if (backoff > maxBackoff) {
-                LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
+                LOG.error("Handshake aborted after exceeding {} msecs backoff", maxBackoff);
                 abort = true;
             } else {
 
-                // pause before restart attempt
-                LOG.debug("Pausing for {} msecs before restart attempt", backoff);
+                // pause before handshake attempt
+                LOG.debug("Pausing for {} msecs before handshake attempt", backoff);
                 try {
                     Thread.sleep(backoff);
                 } catch (InterruptedException e) {
-                    LOG.error("Aborting restart on interrupt!");
+                    LOG.error("Aborting handshake on interrupt!");
                     abort = true;
                 }
             }
@@ -297,26 +273,36 @@ public class SubscriptionHelper extends ServiceSupport {
             if (!abort) {
                 Exception lastError = new SalesforceException("Unknown error", null);
                 try {
-                    // reset client
-                    doStop();
+                    // reset client. If we fail to stop and logout, catch the exception
+                    // so we can still continue to doStart()
+                    if (client != null) {
+                        client.disconnect();
+                        boolean disconnected = client.waitFor(timeout, State.DISCONNECTED);
+                        if (!disconnected) {
+                            LOG.warn("Could not disconnect client connected to: {} after: {} msec.", getEndpointUrl(component),
+                                    timeout);
+                            client.abort();
+                        }
+                    }
 
-                    // register listeners and restart
-                    doStart();
+                    client.handshake();
+                    final long waitMs = MILLISECONDS.convert(CONNECT_TIMEOUT, SECONDS);
+                    client.waitFor(waitMs, BayeuxClient.State.CONNECTED);
 
                 } catch (Exception e) {
-                    LOG.error("Error restarting: " + e.getMessage(), e);
+                    LOG.error("Error handshaking: " + e.getMessage(), e);
                     lastError = e;
                 }
 
                 if (client != null && client.isHandshook()) {
-                    LOG.info("Successfully restarted!");
+                    LOG.debug("Successful handshake!");
                     // reset backoff interval
-                    restartBackoff.set(client.getBackoffIncrement());
+                    handshakeBackoff.set(client.getBackoffIncrement());
                 } else {
-                    LOG.error("Failed to restart after pausing for {} msecs", backoff);
+                    LOG.error("Failed to handshake after pausing for {} msecs", backoff);
                     if ((backoff + backoffIncrement) > maxBackoff) {
                         // notify all consumers
-                        String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
+                        String abortMsg = "Aborting handshake attempt due to: " + lastError.getMessage();
                         SalesforceException ex = new SalesforceException(abortMsg, lastError);
                         for (SalesforceConsumer consumer : listenerMap.keySet()) {
                             consumer.handleException(abortMsg, ex);
@@ -355,7 +341,6 @@ public class SubscriptionHelper extends ServiceSupport {
 
     @Override
     protected void doStop() throws Exception {
-        closeChannel(META_DISCONNECT, disconnectListener);
         closeChannel(META_CONNECT, connectListener);
         closeChannel(META_HANDSHAKE, handshakeListener);
 
@@ -379,12 +364,16 @@ public class SubscriptionHelper extends ServiceSupport {
         }
 
         client = null;
+
+        if (session != null) {
+            session.logout();
+        }
         LOG.debug("Stopped the helper and destroyed the client");
     }
 
-    static BayeuxClient createClient(final SalesforceComponent component) throws SalesforceException {
-        // use default Jetty client from SalesforceComponent, its shared by all
-        // consumers
+    static BayeuxClient createClient(final SalesforceComponent component, final SalesforceSession session)
+            throws SalesforceException {
+        // use default Jetty client from SalesforceComponent, it's shared by all consumers
         final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
 
         Map<String, Object> options = new HashMap<>();
@@ -393,7 +382,6 @@ public class SubscriptionHelper extends ServiceSupport {
             options = component.getLongPollingTransportProperties();
         }
 
-        final SalesforceSession session = component.getSession();
         // check login access token
         if (session.getAccessToken() == null && !component.getLoginConfig().isLazyLogin()) {
             session.login(null);
@@ -429,7 +417,9 @@ public class SubscriptionHelper extends ServiceSupport {
         // create subscription for consumer
         final String channelName = getChannelName(topicName);
 
-        setupReplay((SalesforceEndpoint) consumer.getEndpoint());
+        if (!reconnecting) {
+            setupReplay((SalesforceEndpoint) consumer.getEndpoint());
+        }
 
         // channel message listener
         LOG.info("Subscribing to channel {}...", channelName);
@@ -441,11 +431,8 @@ public class SubscriptionHelper extends ServiceSupport {
                 // convert CometD message to Camel Message
                 consumer.processMessage(channel, message);
             }
-
         };
 
-        final ClientSessionChannel clientChannel = client.getChannel(channelName);
-
         // listener for subscription
         final ClientSessionChannel.MessageListener subscriptionListener = new ClientSessionChannel.MessageListener() {
             public void onMessage(ClientSessionChannel channel, Message message) {
@@ -468,7 +455,7 @@ public class SubscriptionHelper extends ServiceSupport {
                             LOG.warn(msg);
 
                             // retry after delay
-                            final long backoff = restartBackoff.getAndAdd(backoffIncrement);
+                            final long backoff = handshakeBackoff.getAndAdd(backoffIncrement);
                             if (backoff > maxBackoff) {
                                 LOG.error("Subscribe aborted after exceeding {} msecs backoff", maxBackoff);
                             } else {
@@ -478,13 +465,7 @@ public class SubscriptionHelper extends ServiceSupport {
                                     LOG.debug("Pausing for {} msecs before subscribe attempt", backoff);
                                     Thread.sleep(backoff);
 
-                                    final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
-                                    httpClient.getExecutor().execute(new Runnable() {
-                                        @Override
-                                        public void run() {
-                                            subscribe(topicName, consumer);
-                                        }
-                                    });
+                                    component.getHttpClient().getWorkerPool().execute(() -> subscribe(topicName, consumer));
                                 } catch (InterruptedException e) {
                                     LOG.warn("Aborting subscribe on interrupt!", e);
                                 }
@@ -500,7 +481,7 @@ public class SubscriptionHelper extends ServiceSupport {
                         listenerMap.put(consumer, listener);
 
                         // reset backoff interval
-                        restartBackoff.set(0);
+                        handshakeBackoff.set(0);
                     }
 
                     // remove this subscription listener
@@ -509,13 +490,13 @@ public class SubscriptionHelper extends ServiceSupport {
                     } else {
                         LOG.warn("Trying to handle a subscription message but the client is already destroyed");
                     }
-
                 }
             }
         };
         client.getChannel(META_SUBSCRIBE).addListener(subscriptionListener);
 
         // subscribe asynchronously
+        final ClientSessionChannel clientChannel = client.getChannel(channelName);
         clientChannel.subscribe(listener);
     }
 
@@ -527,6 +508,7 @@ public class SubscriptionHelper extends ServiceSupport {
     private static String getFailureReason(Message message) {
         String failureReason = null;
         if (message.getExt() != null) {
+            @SuppressWarnings("unchecked")
             Map<String, Object> sfdcFields = (Map<String, Object>) message.getExt().get(SFDC_FIELD);
             if (sfdcFields != null) {
                 failureReason = (String) sfdcFields.get(FAILURE_REASON_FIELD);
@@ -550,6 +532,37 @@ public class SubscriptionHelper extends ServiceSupport {
         }
     }
 
+    private void attemptLoginUntilSuccessful() {
+        if (!loggingIn.compareAndSet(false, true)) {
+            LOG.debug("already logging in");
+            return;
+        }
+
+        long backoff = 0;
+
+        try {
+            for (;;) {
+                try {
+                    session.login(session.getAccessToken());
+                    break;
+                } catch (SalesforceException e) {
+                    backoff = backoff + backoffIncrement;
+                    if (backoff > maxBackoff) {
+                        backoff = maxBackoff;
+                    }
+                    LOG.warn(String.format("Salesforce login failed. Pausing for %d seconds", backoff), e);
+                    try {
+                        Thread.sleep(backoff);
+                    } catch (InterruptedException ex) {
+                        throw new RuntimeException("Failed to login.", ex);
+                    }
+                }
+            }
+        } finally {
+            loggingIn.set(false);
+        }
+    }
+
     static Optional<Long> determineReplayIdFor(final SalesforceEndpoint endpoint, final String topicName) {
         final String channelName = getChannelName(topicName);
 
@@ -570,7 +583,7 @@ public class SubscriptionHelper extends ServiceSupport {
         final Long componentDefaultReplayId = componentConfiguration.getDefaultReplayId();
 
         // the endpoint values have priority over component values, and the
-        // default values posteriority
+        // default values priority
         // over give topic values
         return Stream.of(replayId, endpointReplayId, componentReplayId, endpointDefaultReplayId, componentDefaultReplayId)
                 .filter(Objects::nonNull).findFirst();
@@ -592,76 +605,26 @@ public class SubscriptionHelper extends ServiceSupport {
         return channelName.toString();
     }
 
-    public void unsubscribe(String topicName, SalesforceConsumer consumer) throws CamelException {
+    public void unsubscribe(String topicName, SalesforceConsumer consumer) {
 
         // channel name
         final String channelName = getChannelName(topicName);
 
-        // listen for unsubscribe error
-        final CountDownLatch latch = new CountDownLatch(1);
-        final String[] unsubscribeError = { null };
-        final Exception[] unsubscribeFailure = { null };
+        // unsubscribe from channel
+        final ClientSessionChannel.MessageListener listener = listenerMap.remove(consumer);
+        if (listener != null) {
 
-        final ClientSessionChannel.MessageListener unsubscribeListener = new ClientSessionChannel.MessageListener() {
-            public void onMessage(ClientSessionChannel channel, Message message) {
-                LOG.debug("[CHANNEL:META_UNSUBSCRIBE]: {}", message);
-                Object subscription = message.get(SUBSCRIPTION_FIELD);
-                if (subscription != null) {
-                    String unsubscribedChannelName = subscription.toString();
-                    if (channelName.equals(unsubscribedChannelName)) {
-
-                        if (!message.isSuccessful()) {
-                            unsubscribeError[0] = (String) message.get(ERROR_FIELD);
-                            unsubscribeFailure[0] = getFailure(message);
-                        } else {
-                            // forget subscription
-                            LOG.info("Unsubscribed from channel {}", unsubscribedChannelName);
-                        }
-                        latch.countDown();
-                    }
-                }
-            }
-        };
-        client.getChannel(META_UNSUBSCRIBE).addListener(unsubscribeListener);
-
-        try {
-            // unsubscribe from channel
-            final ClientSessionChannel.MessageListener listener = listenerMap.remove(consumer);
-            if (listener != null) {
-
-                LOG.info("Unsubscribing from channel {}...", channelName);
-                final ClientSessionChannel clientChannel = client.getChannel(channelName);
-                clientChannel.unsubscribe(listener);
-
-                // confirm unsubscribe
-                try {
-                    if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) {
-                        String message;
-                        if (unsubscribeFailure[0] != null) {
-                            message = String.format("Error unsubscribing from topic %s: %s", topicName,
-                                    unsubscribeFailure[0].getMessage());
-                        } else if (unsubscribeError[0] != null) {
-                            message = String.format("Error unsubscribing from topic %s: %s", topicName, unsubscribeError[0]);
-                        } else {
-                            message = String.format("Timeout error unsubscribing from topic %s after %s seconds", topicName,
-                                    CHANNEL_TIMEOUT);
-                        }
-                        throw new CamelException(message, unsubscribeFailure[0]);
-                    }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    // probably shutting down, forget unsubscribe and return
-                }
-
-            }
-        } finally {
-            client.getChannel(META_UNSUBSCRIBE).removeListener(unsubscribeListener);
+            LOG.debug("Unsubscribing from channel {}...", channelName);
+            final ClientSessionChannel clientChannel = client.getChannel(channelName);
+            // if there are other listeners on this channel, an unsubscribe message will not be sent,
+            // so we're not going to listen for and expect an unsub response. Just unsub and move on.
+            clientChannel.unsubscribe(listener);
         }
     }
 
     static String getEndpointUrl(final SalesforceComponent component) {
         // In version 36.0 replay is only enabled on a separate endpoint
-        if (Double.valueOf(component.getConfig().getApiVersion()) == 36.0) {
+        if (Double.parseDouble(component.getConfig().getApiVersion()) == 36.0) {
             boolean replayOptionsPresent = component.getConfig().getDefaultReplayId() != null
                     || !component.getConfig().getInitialReplayIdMap().isEmpty();
             if (replayOptionsPresent) {
@@ -670,5 +633,4 @@ public class SubscriptionHelper extends ServiceSupport {
         }
         return component.getSession().getInstanceUrl() + "/cometd/" + component.getConfig().getApiVersion();
     }
-
 }
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java
index a4c35b2..6cf6210 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java
@@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Tag("standalone")
@@ -81,13 +80,13 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase {
 
         } finally {
             // remove the test record
-            assertNull(template().requestBody("direct:deleteSObjectWithId", merchandise));
+            template().requestBody("direct:deleteSObjectWithId", merchandise);
 
             // remove the test topic
             // find it using SOQL first
             QueryRecordsPushTopic records = template().requestBody("direct:query", null, QueryRecordsPushTopic.class);
             assertEquals(1, records.getTotalSize(), "Test topic not found");
-            assertNull(template().requestBody("direct:deleteSObject", records.getRecords().get(0)));
+            template().requestBody("direct:deleteSObject", records.getRecords().get(0));
 
         }
     }
@@ -110,7 +109,7 @@ public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase {
                      + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c").to("mock:RawPayloadCamelTestTopic");
 
                 // route for creating test record
-                from("direct:upsertSObject").to("salesforce:upsertSObject?SObjectIdName=Name");
+                from("direct:upsertSObject").to("salesforce:upsertSObject?sObjectIdName=Name");
 
                 // route for finding test topic
                 from("direct:query")
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
index 8c01631..5105595 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
@@ -37,7 +37,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestInstance.Lifecycle;
 import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName;
@@ -203,90 +202,6 @@ public class SubscriptionHelperIntegrationTest {
     }
 
     @Test
-    void shouldResubscribeOnConnectionFailures() throws InterruptedException {
-        // handshake and connect
-        subscription.start();
-
-        final SalesforceConsumer consumer
-                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnConnectionFailures:consumer");
-
-        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnConnectionFailures:endpoint");
-
-        // subscribe
-        when(consumer.getTopicName()).thenReturn("Account");
-
-        when(consumer.getEndpoint()).thenReturn(endpoint);
-        when(endpoint.getConfiguration()).thenReturn(config);
-        when(endpoint.getComponent()).thenReturn(salesforce);
-        when(endpoint.getTopicName()).thenReturn("Account");
-
-        subscription.subscribe("Account", consumer);
-
-        // push one message so we know connection is established and consumer
-        // receives notifications
-        messages.add("[\n"
-                     + "  {\n"
-                     + "    \"data\": {\n"
-                     + "      \"event\": {\n"
-                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
-                     + "        \"replayId\": 1,\n"
-                     + "        \"type\": \"created\"\n"
-                     + "      },\n"
-                     + "      \"sobject\": {\n"
-                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
-                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 1\"\n"
-                     + "      }\n"
-                     + "    },\n"
-                     + "    \"channel\": \"/topic/Account\"\n"
-                     + "  },\n"
-                     + "  {\n"
-                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
-                     + "    \"channel\": \"/meta/connect\",\n"
-                     + "    \"id\": \"$id\",\n"
-                     + "    \"successful\": true\n"
-                     + "  }\n"
-                     + "]");
-
-        verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class),
-                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1"));
-
-        // terminate server abruptly by closing the connection (sends FIN, ACK)
-        server.abruptlyRestart();
-
-        // queue next message for when the client recovers
-        messages.add("[\n"
-                     + "  {\n"
-                     + "    \"data\": {\n"
-                     + "      \"event\": {\n"
-                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
-                     + "        \"replayId\": 2,\n"
-                     + "        \"type\": \"created\"\n"
-                     + "      },\n"
-                     + "      \"sobject\": {\n"
-                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
-                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 2\"\n"
-                     + "      }\n"
-                     + "    },\n"
-                     + "    \"channel\": \"/topic/Account\"\n"
-                     + "  },\n"
-                     + "  {\n"
-                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
-                     + "    \"channel\": \"/meta/connect\",\n"
-                     + "    \"id\": \"$id\",\n"
-                     + "    \"successful\": true\n"
-                     + "  }\n"
-                     + "]");
-
-        // assert last message was received, recovery can take a bit
-        verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class),
-                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2"));
-
-        verify(consumer, atLeastOnce()).getEndpoint();
-        verify(consumer, atLeastOnce()).getTopicName();
-        verifyNoMoreInteractions(consumer);
-    }
-
-    @Test
     void shouldResubscribeOnHelperRestart() {
         // handshake and connect
         subscription.start();
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
index 1ffd1a1..5670219 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
@@ -138,7 +138,7 @@ public class SubscriptionHelperTest {
         when(component.getConfig()).thenReturn(endpointConfig);
         when(component.getSession()).thenReturn(session);
 
-        BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component);
+        BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component, session);
 
         assertNotNull(bayeuxClient);
         verify(session, never()).login(null);
@@ -161,7 +161,7 @@ public class SubscriptionHelperTest {
         when(component.getConfig()).thenReturn(endpointConfig);
         when(component.getSession()).thenReturn(session);
 
-        BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component);
+        BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component, session);
 
         assertNotNull(bayeuxClient);
         verify(session).login(null);