You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/08 13:41:47 UTC
[1/3] camel git commit: CAMEL-11122: camel-reactive-streams - Add
more JMX information
Repository: camel
Updated Branches:
refs/heads/master db7cd8e1d -> 028e810bb
CAMEL-11122: camel-reactive-streams - Add more JMX information
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/028e810b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/028e810b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/028e810b
Branch: refs/heads/master
Commit: 028e810bbc9656e6c8d8ba7b29281efffa6a5de5
Parents: bfb151b
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 8 15:28:32 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 8 15:41:40 2017 +0200
----------------------------------------------------------------------
.../streams/ReactiveStreamsConsumer.java | 24 +----
.../api/CamelReactiveStreamsService.java | 1 -
.../reactive/streams/engine/CamelPublisher.java | 6 +-
.../engine/CamelReactiveStreamsServiceImpl.java | 101 ++++++++++++++++++-
.../streams/engine/CamelSubscriber.java | 9 ++
.../streams/engine/CamelSubscription.java | 4 +
6 files changed, 116 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 8661200..3724585 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -21,11 +21,8 @@ import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.api.management.ManagedAttribute;
-import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
-import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +30,6 @@ import org.slf4j.LoggerFactory;
/**
* The Camel reactive-streams consumer.
*/
-@ManagedResource(description = "Managed ReactiveStreamsConsumer")
public class ReactiveStreamsConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(ReactiveStreamsConsumer.class);
@@ -44,8 +40,6 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
private CamelReactiveStreamsService service;
- private volatile CamelSubscriber subscriber;
-
public ReactiveStreamsConsumer(ReactiveStreamsEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
@@ -62,14 +56,13 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
}
- this.subscriber = this.service.attachCamelConsumer(endpoint.getStream(), this);
+ this.service.attachCamelConsumer(endpoint.getStream(), this);
}
@Override
protected void doStop() throws Exception {
super.doStop();
this.service.detachCamelConsumer(endpoint.getStream());
- this.subscriber = null;
if (executor != null) {
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -128,19 +121,4 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
return endpoint;
}
- @ManagedAttribute(description = "Number of inflight messages")
- public long getInflightCount() {
- return subscriber != null ? subscriber.getInflightCount() : 0;
- }
-
- @ManagedAttribute(description = "Number of messages to be requested on next request")
- public long getToBeRequested() {
- return subscriber != null ? subscriber.getRequested() : 0;
- }
-
- @ManagedAttribute(description = "Number of pending messages in the buffer")
- public long getBufferSize() {
- return subscriber != null ? subscriber.getBufferSize() : 0;
- }
-
}
http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index bdd316e..91b16d4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -20,7 +20,6 @@ import java.util.function.Function;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
-import org.apache.camel.Service;
import org.apache.camel.StaticService;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index 5cafcd2..f90f19c 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -78,7 +78,7 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
DispatchCallback<Exchange> originalCallback = data.getCallback();
if (originalCallback != null && subs.size() > 0) {
// When multiple subscribers have an active subscription,
- // we aknowledge the exchange once it has been delivered to every
+ // we acknowledge the exchange once it has been delivered to every
// subscriber (or their subscription is cancelled)
AtomicInteger counter = new AtomicInteger(subs.size());
// Use just the first exception in the callback when multiple exceptions are thrown
@@ -131,4 +131,8 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
}
subscriptions.clear();
}
+
+ public int getSubscriptionSize() {
+ return subscriptions.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 1abbd94..c3ccc42 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -20,10 +20,21 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
@@ -36,13 +47,16 @@ import org.apache.camel.component.reactive.streams.util.MonoPublisher;
import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
/**
* The default implementation of the reactive streams service.
*/
-public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsService {
+@ManagedResource(description = "Managed CamelReactiveStreamsService")
+public class CamelReactiveStreamsServiceImpl extends ServiceSupport implements CamelReactiveStreamsService {
private CamelContext context;
@@ -60,16 +74,17 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
}
@Override
- public void start() throws Exception {
+ protected void doStart() throws Exception {
ReactiveStreamsComponent component = context.getComponent("reactive-streams", ReactiveStreamsComponent.class);
ReactiveStreamsEngineConfiguration config = component.getInternalEngineConfiguration();
this.workerPool = context.getExecutorServiceManager().newThreadPool(this, config.getThreadPoolName(), config.getThreadPoolMinSize(), config.getThreadPoolMaxSize());
}
@Override
- public void stop() throws Exception {
+ protected void doStop() throws Exception {
if (this.workerPool != null) {
context.getExecutorServiceManager().shutdownNow(this.workerPool);
+ this.workerPool = null;
}
}
@@ -249,7 +264,6 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
return data -> to(uri, data, type);
}
-
@Override
public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) {
try {
@@ -320,4 +334,83 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
return exchange;
}
+ @ManagedOperation(description = "Information about Camel Reactive subscribers")
+ public TabularData camelSubscribers() {
+ try {
+ final TabularData answer = new TabularDataSupport(subscriptionsTabularType());
+
+ subscribers.forEach((k, v) -> {
+ try {
+ String name = k;
+ long inflight = v.getInflightCount();
+ long requested = v.getRequested();
+ long bufferSize = v.getBufferSize();
+ String backpressure = v.getBackpressureStrategy() != null ? v.getBackpressureStrategy().name() : "";
+
+ CompositeType ct = subscriptionsCompositeType();
+ CompositeData data = new CompositeDataSupport(ct,
+ new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
+ new Object[] {name, inflight, requested, bufferSize, backpressure});
+ answer.put(data);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ });
+
+ return answer;
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ @ManagedOperation(description = "Information about Camel Reactive publishers")
+ public TabularData camelPublishers() {
+ try {
+ final TabularData answer = new TabularDataSupport(publishersTabularType());
+
+ publishers.forEach((k, v) -> {
+ try {
+ String name = k;
+ int subscribers = v.getSubscriptionSize();
+
+ CompositeType ct = publishersCompositeType();
+ CompositeData data = new CompositeDataSupport(ct,
+ new String[] {"name", "subscribers"},
+ new Object[] {name, subscribers});
+ answer.put(data);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ });
+
+ return answer;
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ private static CompositeType subscriptionsCompositeType() throws OpenDataException {
+ return new CompositeType("subscriptions", "Subscriptions",
+ new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
+ new String[] {"Name", "Inflight", "Requested", "Buffer Size", "Back Pressure"},
+ new OpenType[] {SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING});
+ }
+
+ private static TabularType subscriptionsTabularType() throws OpenDataException {
+ CompositeType ct = subscriptionsCompositeType();
+ return new TabularType("subscriptions", "Information about Camel Reactive subscribers", ct, new String[]{"name"});
+ }
+
+ private static CompositeType publishersCompositeType() throws OpenDataException {
+ return new CompositeType("publishers", "Publishers",
+ new String[] {"name", "subscribers"},
+ new String[] {"Name", "Subscribers"},
+ new OpenType[] {SimpleType.STRING, SimpleType.INTEGER});
+ }
+
+ private static TabularType publishersTabularType() throws OpenDataException {
+ CompositeType ct = publishersCompositeType();
+ return new TabularType("publishers", "Information about Camel Reactive publishers", ct, new String[]{"name"});
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index 4a232df..dba42f0 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.camel.Exchange;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -207,4 +208,12 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
return 0;
}
}
+
+ public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
+ if (subscription != null && subscription instanceof CamelSubscription) {
+ return ((CamelSubscription) subscription).getBackpressureStrategy();
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index 37b6670..431ca6d 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -248,4 +248,8 @@ public class CamelSubscription implements Subscription {
public long getBufferSize() {
return buffer.size();
}
+
+ public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
+ return backpressureStrategy;
+ }
}
[3/3] camel git commit: CAMEL-11122: camel-reactive-streams - Add
more JMX information
Posted by da...@apache.org.
CAMEL-11122: camel-reactive-streams - Add more JMX information
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5b9cf7c8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5b9cf7c8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5b9cf7c8
Branch: refs/heads/master
Commit: 5b9cf7c8b298c08fbec2564c70ecb3d078f6b4ab
Parents: db7cd8e1
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 8 14:30:17 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 8 15:41:40 2017 +0200
----------------------------------------------------------------------
.../component/reactive/streams/ReactiveStreamsEndpoint.java | 8 ++++++++
1 file changed, 8 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5b9cf7c8/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
index e7edf8a..461913b 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.reactive.streams;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
@@ -29,6 +31,7 @@ import org.apache.camel.spi.UriPath;
*/
@UriEndpoint(firstVersion = "2.19.0", scheme = "reactive-streams", title = "Reactive Streams", syntax = "reactive-streams:stream",
consumerClass = ReactiveStreamsConsumer.class, label = "reactive,streams")
+@ManagedResource(description = "Managed ReactiveStreamsEndpoint")
public class ReactiveStreamsEndpoint extends DefaultEndpoint {
@UriPath
@@ -71,6 +74,7 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
return new ReactiveStreamsConsumer(this, processor);
}
+ @ManagedAttribute(description = "Name of the stream channel used by the endpoint to exchange messages")
public String getStream() {
return stream;
}
@@ -82,6 +86,7 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
this.stream = stream;
}
+ @ManagedAttribute(description = "Maximum number of exchanges concurrently being processed by Camel")
public Integer getMaxInflightExchanges() {
return maxInflightExchanges;
}
@@ -102,6 +107,7 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
/**
* Number of threads used to process exchanges in the Camel route.
*/
+ @ManagedAttribute(description = "Number of threads used to process exchanges in the Camel route")
public void setConcurrentConsumers(int concurrentConsumers) {
this.concurrentConsumers = concurrentConsumers;
}
@@ -128,6 +134,7 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
this.backpressureStrategy = backpressureStrategy;
}
+ @ManagedAttribute(description = "Determines if onComplete events should be pushed to the Camel route")
public boolean isForwardOnComplete() {
return forwardOnComplete;
}
@@ -139,6 +146,7 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
this.forwardOnComplete = forwardOnComplete;
}
+ @ManagedAttribute(description = "Determines if onError events should be pushed to the Camel route")
public boolean isForwardOnError() {
return forwardOnError;
}
[2/3] camel git commit: CAMEL-11122: camel-reactive-streams - Add
more JMX information
Posted by da...@apache.org.
CAMEL-11122: camel-reactive-streams - Add more JMX information
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bfb151b0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bfb151b0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bfb151b0
Branch: refs/heads/master
Commit: bfb151b08a108e7314ea7e75c283ce167ec428d8
Parents: 5b9cf7c
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 8 14:47:40 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 8 15:41:40 2017 +0200
----------------------------------------------------------------------
.../streams/ReactiveStreamsConsumer.java | 25 +++++++++++++++++++-
.../api/CamelReactiveStreamsService.java | 7 ++++--
.../engine/CamelReactiveStreamsServiceImpl.java | 6 +++--
.../streams/engine/CamelSubscriber.java | 17 +++++++++++--
.../streams/engine/CamelSubscription.java | 4 ++++
.../support/ReactiveStreamsTestService.java | 5 ++--
6 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 190fa6c..8661200 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -21,8 +21,11 @@ import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +33,7 @@ import org.slf4j.LoggerFactory;
/**
* The Camel reactive-streams consumer.
*/
+@ManagedResource(description = "Managed ReactiveStreamsConsumer")
public class ReactiveStreamsConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(ReactiveStreamsConsumer.class);
@@ -40,6 +44,8 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
private CamelReactiveStreamsService service;
+ private volatile CamelSubscriber subscriber;
+
public ReactiveStreamsConsumer(ReactiveStreamsEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
@@ -56,13 +62,14 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
}
- this.service.attachCamelConsumer(endpoint.getStream(), this);
+ this.subscriber = this.service.attachCamelConsumer(endpoint.getStream(), this);
}
@Override
protected void doStop() throws Exception {
super.doStop();
this.service.detachCamelConsumer(endpoint.getStream());
+ this.subscriber = null;
if (executor != null) {
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -120,4 +127,20 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
public ReactiveStreamsEndpoint getEndpoint() {
return endpoint;
}
+
+ @ManagedAttribute(description = "Number of inflight messages")
+ public long getInflightCount() {
+ return subscriber != null ? subscriber.getInflightCount() : 0;
+ }
+
+ @ManagedAttribute(description = "Number of messages to be requested on next request")
+ public long getToBeRequested() {
+ return subscriber != null ? subscriber.getRequested() : 0;
+ }
+
+ @ManagedAttribute(description = "Number of pending messages in the buffer")
+ public long getBufferSize() {
+ return subscriber != null ? subscriber.getBufferSize() : 0;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index cbd9fda..bdd316e 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -21,15 +21,17 @@ import java.util.function.Function;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Service;
+import org.apache.camel.StaticService;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
/**
* The interface to which any implementation of the reactive-streams engine should comply.
*/
-public interface CamelReactiveStreamsService extends CamelContextAware, Service {
+public interface CamelReactiveStreamsService extends CamelContextAware, StaticService {
/*
* Main API methods.
@@ -295,9 +297,10 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
*
* @param name the stream name
* @param consumer the consumer of the route
+ * @return the associated subscriber
* @throws IllegalStateException if another consumer is already associated with the given stream name
*/
- void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer);
+ CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer);
/**
* Used by Camel to detach the existing consumer from the given stream.
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 15a10e4..1abbd94 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -276,8 +276,10 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
}
@Override
- public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
- streamSubscriber(name).attachConsumer(consumer);
+ public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
+ CamelSubscriber subscriber = streamSubscriber(name);
+ subscriber.attachConsumer(consumer);
+ return subscriber;
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index 098d0ca..4a232df 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -19,10 +19,8 @@ package org.apache.camel.component.reactive.streams.engine;
import java.io.Closeable;
import java.io.IOException;
-import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
@@ -194,4 +192,19 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
}
}
+ public long getRequested() {
+ return requested;
+ }
+
+ public long getInflightCount() {
+ return inflightCount;
+ }
+
+ public long getBufferSize() {
+ if (subscription != null && subscription instanceof CamelSubscription) {
+ return ((CamelSubscription) subscription).getBufferSize();
+ } else {
+ return 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index 16c4eea..37b6670 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -244,4 +244,8 @@ public class CamelSubscription implements Subscription {
this.backpressureStrategy = backpressureStrategy;
mutex.unlock();
}
+
+ public long getBufferSize() {
+ return buffer.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 995c218..3f8ed0c 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -24,6 +24,7 @@ import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.api.DispatchCallback;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@@ -87,8 +88,8 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
}
@Override
- public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
-
+ public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
+ return null;
}
@Override