You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/08 13:41:47 UTC

[1/3] camel git commit: CAMEL-11122: camel-reactive-streams - Add more JMX information

Repository: camel
Updated Branches:
  refs/heads/master db7cd8e1d -> 028e810bb


CAMEL-11122: camel-reactive-streams - Add more JMX information


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/028e810b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/028e810b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/028e810b

Branch: refs/heads/master
Commit: 028e810bbc9656e6c8d8ba7b29281efffa6a5de5
Parents: bfb151b
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 8 15:28:32 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 8 15:41:40 2017 +0200

----------------------------------------------------------------------
 .../streams/ReactiveStreamsConsumer.java        |  24 +----
 .../api/CamelReactiveStreamsService.java        |   1 -
 .../reactive/streams/engine/CamelPublisher.java |   6 +-
 .../engine/CamelReactiveStreamsServiceImpl.java | 101 ++++++++++++++++++-
 .../streams/engine/CamelSubscriber.java         |   9 ++
 .../streams/engine/CamelSubscription.java       |   4 +
 6 files changed, 116 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 8661200..3724585 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -21,11 +21,8 @@ import java.util.concurrent.ExecutorService;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.api.management.ManagedAttribute;
-import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
-import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
 import org.apache.camel.impl.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +30,6 @@ import org.slf4j.LoggerFactory;
 /**
  * The Camel reactive-streams consumer.
  */
-@ManagedResource(description = "Managed ReactiveStreamsConsumer")
 public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(ReactiveStreamsConsumer.class);
@@ -44,8 +40,6 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     private CamelReactiveStreamsService service;
 
-    private volatile CamelSubscriber subscriber;
-
     public ReactiveStreamsConsumer(ReactiveStreamsEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
@@ -62,14 +56,13 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
             executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
         }
 
-        this.subscriber = this.service.attachCamelConsumer(endpoint.getStream(), this);
+        this.service.attachCamelConsumer(endpoint.getStream(), this);
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
         this.service.detachCamelConsumer(endpoint.getStream());
-        this.subscriber = null;
 
         if (executor != null) {
             endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -128,19 +121,4 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
         return endpoint;
     }
 
-    @ManagedAttribute(description = "Number of inflight messages")
-    public long getInflightCount() {
-        return subscriber != null ? subscriber.getInflightCount() : 0;
-    }
-
-    @ManagedAttribute(description = "Number of messages to be requested on next request")
-    public long getToBeRequested() {
-        return subscriber != null ? subscriber.getRequested() : 0;
-    }
-
-    @ManagedAttribute(description = "Number of pending messages in the buffer")
-    public long getBufferSize() {
-        return subscriber != null ? subscriber.getBufferSize() : 0;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index bdd316e..91b16d4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -20,7 +20,6 @@ import java.util.function.Function;
 
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
-import org.apache.camel.Service;
 import org.apache.camel.StaticService;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;

http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index 5cafcd2..f90f19c 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -78,7 +78,7 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
         DispatchCallback<Exchange> originalCallback = data.getCallback();
         if (originalCallback != null && subs.size() > 0) {
             // When multiple subscribers have an active subscription,
-            // we aknowledge the exchange once it has been delivered to every
+            // we acknowledge the exchange once it has been delivered to every
             // subscriber (or their subscription is cancelled)
             AtomicInteger counter = new AtomicInteger(subs.size());
             // Use just the first exception in the callback when multiple exceptions are thrown
@@ -131,4 +131,8 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
         }
         subscriptions.clear();
     }
+
+    public int getSubscriptionSize() {
+        return subscriptions.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 1abbd94..c3ccc42 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -20,10 +20,21 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
@@ -36,13 +47,16 @@ import org.apache.camel.component.reactive.streams.util.MonoPublisher;
 import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
 /**
  * The default implementation of the reactive streams service.
  */
-public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsService {
+@ManagedResource(description = "Managed CamelReactiveStreamsService")
+public class CamelReactiveStreamsServiceImpl extends ServiceSupport implements CamelReactiveStreamsService {
 
     private CamelContext context;
 
@@ -60,16 +74,17 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
-    public void start() throws Exception {
+    protected void doStart() throws Exception {
         ReactiveStreamsComponent component = context.getComponent("reactive-streams", ReactiveStreamsComponent.class);
         ReactiveStreamsEngineConfiguration config = component.getInternalEngineConfiguration();
         this.workerPool = context.getExecutorServiceManager().newThreadPool(this, config.getThreadPoolName(), config.getThreadPoolMinSize(), config.getThreadPoolMaxSize());
     }
 
     @Override
-    public void stop() throws Exception {
+    protected void doStop() throws Exception {
         if (this.workerPool != null) {
             context.getExecutorServiceManager().shutdownNow(this.workerPool);
+            this.workerPool = null;
         }
     }
 
@@ -249,7 +264,6 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
         return data -> to(uri, data, type);
     }
 
-
     @Override
     public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) {
         try {
@@ -320,4 +334,83 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
         return exchange;
     }
 
+    @ManagedOperation(description = "Information about Camel Reactive subscribers")
+    public TabularData camelSubscribers() {
+        try {
+            final TabularData answer = new TabularDataSupport(subscriptionsTabularType());
+
+            subscribers.forEach((k, v) -> {
+                try {
+                    String name = k;
+                    long inflight = v.getInflightCount();
+                    long requested = v.getRequested();
+                    long bufferSize = v.getBufferSize();
+                    String backpressure = v.getBackpressureStrategy() != null ? v.getBackpressureStrategy().name() : "";
+
+                    CompositeType ct = subscriptionsCompositeType();
+                    CompositeData data = new CompositeDataSupport(ct,
+                        new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
+                        new Object[] {name, inflight, requested, bufferSize, backpressure});
+                    answer.put(data);
+                } catch (Exception e) {
+                    throw ObjectHelper.wrapRuntimeCamelException(e);
+                }
+            });
+
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    @ManagedOperation(description = "Information about Camel Reactive publishers")
+    public TabularData camelPublishers() {
+        try {
+            final TabularData answer = new TabularDataSupport(publishersTabularType());
+
+            publishers.forEach((k, v) -> {
+                try {
+                    String name = k;
+                    int subscribers = v.getSubscriptionSize();
+
+                    CompositeType ct = publishersCompositeType();
+                    CompositeData data = new CompositeDataSupport(ct,
+                        new String[] {"name", "subscribers"},
+                        new Object[] {name, subscribers});
+                    answer.put(data);
+                } catch (Exception e) {
+                    throw ObjectHelper.wrapRuntimeCamelException(e);
+                }
+            });
+
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    private static CompositeType subscriptionsCompositeType() throws OpenDataException {
+        return new CompositeType("subscriptions", "Subscriptions",
+            new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
+            new String[] {"Name", "Inflight", "Requested", "Buffer Size", "Back Pressure"},
+            new OpenType[] {SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING});
+    }
+
+    private static TabularType subscriptionsTabularType() throws OpenDataException {
+        CompositeType ct = subscriptionsCompositeType();
+        return new TabularType("subscriptions", "Information about Camel Reactive subscribers", ct, new String[]{"name"});
+    }
+
+    private static CompositeType publishersCompositeType() throws OpenDataException {
+        return new CompositeType("publishers", "Publishers",
+            new String[] {"name", "subscribers"},
+            new String[] {"Name", "Subscribers"},
+            new OpenType[] {SimpleType.STRING, SimpleType.INTEGER});
+    }
+
+    private static TabularType publishersTabularType() throws OpenDataException {
+        CompositeType ct = publishersCompositeType();
+        return new TabularType("publishers", "Information about Camel Reactive publishers", ct, new String[]{"name"});
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index 4a232df..dba42f0 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
@@ -207,4 +208,12 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
             return 0;
         }
     }
+
+    public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
+        if (subscription != null && subscription instanceof CamelSubscription) {
+            return ((CamelSubscription) subscription).getBackpressureStrategy();
+        } else {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index 37b6670..431ca6d 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -248,4 +248,8 @@ public class CamelSubscription implements Subscription {
     public long getBufferSize() {
         return buffer.size();
     }
+
+    public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
+        return backpressureStrategy;
+    }
 }


[3/3] camel git commit: CAMEL-11122: camel-reactive-streams - Add more JMX information

Posted by da...@apache.org.
CAMEL-11122: camel-reactive-streams - Add more JMX information


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5b9cf7c8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5b9cf7c8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5b9cf7c8

Branch: refs/heads/master
Commit: 5b9cf7c8b298c08fbec2564c70ecb3d078f6b4ab
Parents: db7cd8e1
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 8 14:30:17 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 8 15:41:40 2017 +0200

----------------------------------------------------------------------
 .../component/reactive/streams/ReactiveStreamsEndpoint.java  | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5b9cf7c8/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
index e7edf8a..461913b 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.reactive.streams;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
@@ -29,6 +31,7 @@ import org.apache.camel.spi.UriPath;
  */
 @UriEndpoint(firstVersion = "2.19.0", scheme = "reactive-streams", title = "Reactive Streams", syntax = "reactive-streams:stream",
         consumerClass = ReactiveStreamsConsumer.class, label = "reactive,streams")
+@ManagedResource(description = "Managed ReactiveStreamsEndpoint")
 public class ReactiveStreamsEndpoint extends DefaultEndpoint {
 
     @UriPath
@@ -71,6 +74,7 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
         return new ReactiveStreamsConsumer(this, processor);
     }
 
+    @ManagedAttribute(description = "Name of the stream channel used by the endpoint to exchange messages")
     public String getStream() {
         return stream;
     }
@@ -82,6 +86,7 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
         this.stream = stream;
     }
 
+    @ManagedAttribute(description = "Maximum number of exchanges concurrently being processed by Camel")
     public Integer getMaxInflightExchanges() {
         return maxInflightExchanges;
     }
@@ -102,6 +107,7 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
     /**
      * Number of threads used to process exchanges in the Camel route.
      */
+    @ManagedAttribute(description = "Number of threads used to process exchanges in the Camel route")
     public void setConcurrentConsumers(int concurrentConsumers) {
         this.concurrentConsumers = concurrentConsumers;
     }
@@ -128,6 +134,7 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
         this.backpressureStrategy = backpressureStrategy;
     }
 
+    @ManagedAttribute(description = "Determines if onComplete events should be pushed to the Camel route")
     public boolean isForwardOnComplete() {
         return forwardOnComplete;
     }
@@ -139,6 +146,7 @@ public class ReactiveStreamsEndpoint extends DefaultEndpoint {
         this.forwardOnComplete = forwardOnComplete;
     }
 
+    @ManagedAttribute(description = "Determines if onError events should be pushed to the Camel route")
     public boolean isForwardOnError() {
         return forwardOnError;
     }


[2/3] camel git commit: CAMEL-11122: camel-reactive-streams - Add more JMX information

Posted by da...@apache.org.
CAMEL-11122: camel-reactive-streams - Add more JMX information


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bfb151b0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bfb151b0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bfb151b0

Branch: refs/heads/master
Commit: bfb151b08a108e7314ea7e75c283ce167ec428d8
Parents: 5b9cf7c
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 8 14:47:40 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 8 15:41:40 2017 +0200

----------------------------------------------------------------------
 .../streams/ReactiveStreamsConsumer.java        | 25 +++++++++++++++++++-
 .../api/CamelReactiveStreamsService.java        |  7 ++++--
 .../engine/CamelReactiveStreamsServiceImpl.java |  6 +++--
 .../streams/engine/CamelSubscriber.java         | 17 +++++++++++--
 .../streams/engine/CamelSubscription.java       |  4 ++++
 .../support/ReactiveStreamsTestService.java     |  5 ++--
 6 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 190fa6c..8661200 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -21,8 +21,11 @@ import java.util.concurrent.ExecutorService;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
 import org.apache.camel.impl.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +33,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The Camel reactive-streams consumer.
  */
+@ManagedResource(description = "Managed ReactiveStreamsConsumer")
 public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(ReactiveStreamsConsumer.class);
@@ -40,6 +44,8 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     private CamelReactiveStreamsService service;
 
+    private volatile CamelSubscriber subscriber;
+
     public ReactiveStreamsConsumer(ReactiveStreamsEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
@@ -56,13 +62,14 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
             executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
         }
 
-        this.service.attachCamelConsumer(endpoint.getStream(), this);
+        this.subscriber = this.service.attachCamelConsumer(endpoint.getStream(), this);
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
         this.service.detachCamelConsumer(endpoint.getStream());
+        this.subscriber = null;
 
         if (executor != null) {
             endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -120,4 +127,20 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
     public ReactiveStreamsEndpoint getEndpoint() {
         return endpoint;
     }
+
+    @ManagedAttribute(description = "Number of inflight messages")
+    public long getInflightCount() {
+        return subscriber != null ? subscriber.getInflightCount() : 0;
+    }
+
+    @ManagedAttribute(description = "Number of messages to be requested on next request")
+    public long getToBeRequested() {
+        return subscriber != null ? subscriber.getRequested() : 0;
+    }
+
+    @ManagedAttribute(description = "Number of pending messages in the buffer")
+    public long getBufferSize() {
+        return subscriber != null ? subscriber.getBufferSize() : 0;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index cbd9fda..bdd316e 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -21,15 +21,17 @@ import java.util.function.Function;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Service;
+import org.apache.camel.StaticService;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
 /**
  * The interface to which any implementation of the reactive-streams engine should comply.
  */
-public interface CamelReactiveStreamsService extends CamelContextAware, Service {
+public interface CamelReactiveStreamsService extends CamelContextAware, StaticService {
 
     /*
      * Main API methods.
@@ -295,9 +297,10 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      *
      * @param name the stream name
      * @param consumer the consumer of the route
+     * @return the associated subscriber
      * @throws IllegalStateException if another consumer is already associated with the given stream name
      */
-    void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer);
+    CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer);
 
     /**
      * Used by Camel to detach the existing consumer from the given stream.

http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 15a10e4..1abbd94 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -276,8 +276,10 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
-    public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
-        streamSubscriber(name).attachConsumer(consumer);
+    public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
+        CamelSubscriber subscriber = streamSubscriber(name);
+        subscriber.attachConsumer(consumer);
+        return subscriber;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index 098d0ca..4a232df 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -19,10 +19,8 @@ package org.apache.camel.component.reactive.streams.engine;
 import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 import org.slf4j.Logger;
@@ -194,4 +192,19 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
         }
     }
 
+    public long getRequested() {
+        return requested;
+    }
+
+    public long getInflightCount() {
+        return inflightCount;
+    }
+
+    public long getBufferSize() {
+        if (subscription != null && subscription instanceof CamelSubscription) {
+            return ((CamelSubscription) subscription).getBufferSize();
+        } else {
+            return 0;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index 16c4eea..37b6670 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -244,4 +244,8 @@ public class CamelSubscription implements Subscription {
         this.backpressureStrategy = backpressureStrategy;
         mutex.unlock();
     }
+
+    public long getBufferSize() {
+        return buffer.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 995c218..3f8ed0c 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -24,6 +24,7 @@ import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
@@ -87,8 +88,8 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
     }
 
     @Override
-    public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
-
+    public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
+        return null;
     }
 
     @Override