You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/08 13:41:48 UTC

[2/3] camel git commit: CAMEL-11122: camel-reactive-streams - Add more JMX information

CAMEL-11122: camel-reactive-streams - Add more JMX information


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bfb151b0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bfb151b0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bfb151b0

Branch: refs/heads/master
Commit: bfb151b08a108e7314ea7e75c283ce167ec428d8
Parents: 5b9cf7c
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 8 14:47:40 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 8 15:41:40 2017 +0200

----------------------------------------------------------------------
 .../streams/ReactiveStreamsConsumer.java        | 25 +++++++++++++++++++-
 .../api/CamelReactiveStreamsService.java        |  7 ++++--
 .../engine/CamelReactiveStreamsServiceImpl.java |  6 +++--
 .../streams/engine/CamelSubscriber.java         | 17 +++++++++++--
 .../streams/engine/CamelSubscription.java       |  4 ++++
 .../support/ReactiveStreamsTestService.java     |  5 ++--
 6 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 190fa6c..8661200 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -21,8 +21,11 @@ import java.util.concurrent.ExecutorService;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
 import org.apache.camel.impl.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +33,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The Camel reactive-streams consumer.
  */
+@ManagedResource(description = "Managed ReactiveStreamsConsumer")
 public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(ReactiveStreamsConsumer.class);
@@ -40,6 +44,8 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     private CamelReactiveStreamsService service;
 
+    private volatile CamelSubscriber subscriber;
+
     public ReactiveStreamsConsumer(ReactiveStreamsEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
@@ -56,13 +62,14 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
             executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
         }
 
-        this.service.attachCamelConsumer(endpoint.getStream(), this);
+        this.subscriber = this.service.attachCamelConsumer(endpoint.getStream(), this);
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
         this.service.detachCamelConsumer(endpoint.getStream());
+        this.subscriber = null;
 
         if (executor != null) {
             endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -120,4 +127,20 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
     public ReactiveStreamsEndpoint getEndpoint() {
         return endpoint;
     }
+
+    @ManagedAttribute(description = "Number of inflight messages")
+    public long getInflightCount() {
+        return subscriber != null ? subscriber.getInflightCount() : 0;
+    }
+
+    @ManagedAttribute(description = "Number of messages to be requested on next request")
+    public long getToBeRequested() {
+        return subscriber != null ? subscriber.getRequested() : 0;
+    }
+
+    @ManagedAttribute(description = "Number of pending messages in the buffer")
+    public long getBufferSize() {
+        return subscriber != null ? subscriber.getBufferSize() : 0;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index cbd9fda..bdd316e 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -21,15 +21,17 @@ import java.util.function.Function;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Service;
+import org.apache.camel.StaticService;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
 /**
  * The interface to which any implementation of the reactive-streams engine should comply.
  */
-public interface CamelReactiveStreamsService extends CamelContextAware, Service {
+public interface CamelReactiveStreamsService extends CamelContextAware, StaticService {
 
     /*
      * Main API methods.
@@ -295,9 +297,10 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      *
      * @param name the stream name
      * @param consumer the consumer of the route
+     * @return the associated subscriber
      * @throws IllegalStateException if another consumer is already associated with the given stream name
      */
-    void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer);
+    CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer);
 
     /**
      * Used by Camel to detach the existing consumer from the given stream.

http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 15a10e4..1abbd94 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -276,8 +276,10 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
-    public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
-        streamSubscriber(name).attachConsumer(consumer);
+    public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
+        CamelSubscriber subscriber = streamSubscriber(name);
+        subscriber.attachConsumer(consumer);
+        return subscriber;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index 098d0ca..4a232df 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -19,10 +19,8 @@ package org.apache.camel.component.reactive.streams.engine;
 import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 import org.slf4j.Logger;
@@ -194,4 +192,19 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
         }
     }
 
+    public long getRequested() {
+        return requested;
+    }
+
+    public long getInflightCount() {
+        return inflightCount;
+    }
+
+    public long getBufferSize() {
+        if (subscription != null && subscription instanceof CamelSubscription) {
+            return ((CamelSubscription) subscription).getBufferSize();
+        } else {
+            return 0;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index 16c4eea..37b6670 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -244,4 +244,8 @@ public class CamelSubscription implements Subscription {
         this.backpressureStrategy = backpressureStrategy;
         mutex.unlock();
     }
+
+    public long getBufferSize() {
+        return buffer.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 995c218..3f8ed0c 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -24,6 +24,7 @@ import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
@@ -87,8 +88,8 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
     }
 
     @Override
-    public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
-
+    public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
+        return null;
     }
 
     @Override