You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/05/12 11:56:27 UTC
camel git commit: CAMEL-9962: Add a field in the consumer to define
if it is subscribed to the topic or not
Repository: camel
Updated Branches:
refs/heads/master e904bf498 -> 5582508a7
CAMEL-9962: Add a field in the consumer to define if it is subscribed to the topic or not
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5582508a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5582508a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5582508a
Branch: refs/heads/master
Commit: 5582508a74fb67f806c5a5dcf58705e99295a62a
Parents: e904bf4
Author: Andrea Cosentino <an...@gmail.com>
Authored: Thu May 12 13:54:36 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Thu May 12 13:55:34 2016 +0200
----------------------------------------------------------------------
.../apache/camel/component/nats/NatsConsumer.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5582508a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index d0abb36..8fc2eff 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -42,6 +42,7 @@ public class NatsConsumer extends DefaultConsumer {
private ExecutorService executor;
private Connection connection;
private Subscription sid;
+ private boolean subscribed;
public NatsConsumer(NatsEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -101,6 +102,14 @@ public class NatsConsumer extends DefaultConsumer {
return connection;
}
+ public boolean isSubscribed() {
+ return subscribed;
+ }
+
+ public void setSubscribed(boolean subscribed) {
+ this.subscribed = subscribed;
+ }
+
class NatsConsumingTask implements Runnable {
private final Connection connection;
@@ -133,6 +142,9 @@ public class NatsConsumer extends DefaultConsumer {
if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
}
+ if (sid.isValid()) {
+ setSubscribed(true);
+ }
} else {
sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() {
@Override
@@ -151,7 +163,10 @@ public class NatsConsumer extends DefaultConsumer {
});
if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
- }
+ }
+ if (sid.isValid()) {
+ setSubscribed(true);
+ }
}
} catch (Throwable e) {
getExceptionHandler().handleException("Error during processing", e);