You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2020/12/16 14:11:49 UTC
[camel] 01/02: CAMEL-12871: release resources on stop
This is an automated email from the ASF dual-hosted git repository.
zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git
commit d22c3a40e871b76cfbd9f0028861905e1249aa84
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 14:41:17 2020 +0100
CAMEL-12871: release resources on stop
When SubscriptionHelper is stopped we need to remove all listeners and
close channels this helper is listening on.
---
.../internal/streaming/SubscriptionHelper.java | 29 +++++++++++++++++++---
1 file changed, 26 insertions(+), 3 deletions(-)
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 625b2f2..25e363b 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -36,6 +36,7 @@ import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.support.service.ServiceSupport;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
import org.cometd.client.BayeuxClient;
import org.cometd.client.BayeuxClient.State;
import org.cometd.client.transport.ClientTransport;
@@ -320,11 +321,33 @@ public class SubscriptionHelper extends ServiceSupport {
return exception;
}
+ private void closeChannel(final String name, MessageListener listener) {
+ if (client == null) {
+ return;
+ }
+
+ final ClientSessionChannel channel = client.getChannel(name);
+ channel.removeListener(listener);
+ channel.release();
+ }
+
@Override
protected void doStop() throws Exception {
- client.getChannel(META_DISCONNECT).removeListener(disconnectListener);
- client.getChannel(META_CONNECT).removeListener(connectListener);
- client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
+ closeChannel(META_DISCONNECT, disconnectListener);
+ closeChannel(META_CONNECT, connectListener);
+ closeChannel(META_HANDSHAKE, handshakeListener);
+
+ for (Map.Entry<SalesforceConsumer, MessageListener> entry : listenerMap.entrySet()) {
+ final SalesforceConsumer consumer = entry.getKey();
+ final String topic = consumer.getTopicName();
+
+ final MessageListener listener = entry.getValue();
+ closeChannel(getChannelName(topic), listener);
+ }
+
+ if (client == null) {
+ return;
+ }
client.disconnect();
boolean disconnected = client.waitFor(timeout, State.DISCONNECTED);