You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2020/12/14 12:54:42 UTC

[camel] 02/03: CAMEL-12871: release resources on stop (WIP)

This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a5d16c02c9c77a65945258a4b5576262323cfd8b
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Mon Dec 14 13:53:50 2020 +0100

    CAMEL-12871: release resources on stop (WIP)
    
    When SubscriptionHelper is stopped we need to remove all listeners and
    close channels this helper is listening on.
---
 .../internal/streaming/SubscriptionHelper.java      | 21 ++++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 625b2f2..e8c3360 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -36,6 +36,7 @@ import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.service.ServiceSupport;
 import org.cometd.bayeux.Message;
 import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
 import org.cometd.client.BayeuxClient;
 import org.cometd.client.BayeuxClient.State;
 import org.cometd.client.transport.ClientTransport;
@@ -320,11 +321,25 @@ public class SubscriptionHelper extends ServiceSupport {
         return exception;
     }
 
+    private void closeChannel(final String name, MessageListener listener) {
+        final ClientSessionChannel channel = client.getChannel(name);
+        channel.removeListener(listener);
+        channel.release();
+    }
+
     @Override
     protected void doStop() throws Exception {
-        client.getChannel(META_DISCONNECT).removeListener(disconnectListener);
-        client.getChannel(META_CONNECT).removeListener(connectListener);
-        client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
+        closeChannel(META_DISCONNECT, disconnectListener);
+        closeChannel(META_CONNECT, connectListener);
+        closeChannel(META_HANDSHAKE, handshakeListener);
+
+        for (Map.Entry<SalesforceConsumer, MessageListener> entry : listenerMap.entrySet()) {
+            final SalesforceConsumer consumer = entry.getKey();
+            final String topic = consumer.getTopicName();
+
+            final MessageListener listener = entry.getValue();
+            closeChannel(getChannelName(topic), listener);
+        }
 
         client.disconnect();
         boolean disconnected = client.waitFor(timeout, State.DISCONNECTED);