You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/08 13:41:48 UTC
[2/3] camel git commit: CAMEL-11122: camel-reactive-streams - Add
more JMX information
CAMEL-11122: camel-reactive-streams - Add more JMX information
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bfb151b0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bfb151b0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bfb151b0
Branch: refs/heads/master
Commit: bfb151b08a108e7314ea7e75c283ce167ec428d8
Parents: 5b9cf7c
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 8 14:47:40 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 8 15:41:40 2017 +0200
----------------------------------------------------------------------
.../streams/ReactiveStreamsConsumer.java | 25 +++++++++++++++++++-
.../api/CamelReactiveStreamsService.java | 7 ++++--
.../engine/CamelReactiveStreamsServiceImpl.java | 6 +++--
.../streams/engine/CamelSubscriber.java | 17 +++++++++++--
.../streams/engine/CamelSubscription.java | 4 ++++
.../support/ReactiveStreamsTestService.java | 5 ++--
6 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 190fa6c..8661200 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -21,8 +21,11 @@ import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +33,7 @@ import org.slf4j.LoggerFactory;
/**
* The Camel reactive-streams consumer.
*/
+@ManagedResource(description = "Managed ReactiveStreamsConsumer")
public class ReactiveStreamsConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(ReactiveStreamsConsumer.class);
@@ -40,6 +44,8 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
private CamelReactiveStreamsService service;
+ private volatile CamelSubscriber subscriber;
+
public ReactiveStreamsConsumer(ReactiveStreamsEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
@@ -56,13 +62,14 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
}
- this.service.attachCamelConsumer(endpoint.getStream(), this);
+ this.subscriber = this.service.attachCamelConsumer(endpoint.getStream(), this);
}
@Override
protected void doStop() throws Exception {
super.doStop();
this.service.detachCamelConsumer(endpoint.getStream());
+ this.subscriber = null;
if (executor != null) {
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -120,4 +127,20 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
public ReactiveStreamsEndpoint getEndpoint() {
return endpoint;
}
+
+ @ManagedAttribute(description = "Number of inflight messages")
+ public long getInflightCount() {
+ return subscriber != null ? subscriber.getInflightCount() : 0;
+ }
+
+ @ManagedAttribute(description = "Number of messages to be requested on next request")
+ public long getToBeRequested() {
+ return subscriber != null ? subscriber.getRequested() : 0;
+ }
+
+ @ManagedAttribute(description = "Number of pending messages in the buffer")
+ public long getBufferSize() {
+ return subscriber != null ? subscriber.getBufferSize() : 0;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index cbd9fda..bdd316e 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -21,15 +21,17 @@ import java.util.function.Function;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Service;
+import org.apache.camel.StaticService;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
/**
* The interface to which any implementation of the reactive-streams engine should comply.
*/
-public interface CamelReactiveStreamsService extends CamelContextAware, Service {
+public interface CamelReactiveStreamsService extends CamelContextAware, StaticService {
/*
* Main API methods.
@@ -295,9 +297,10 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
*
* @param name the stream name
* @param consumer the consumer of the route
+ * @return the associated subscriber
* @throws IllegalStateException if another consumer is already associated with the given stream name
*/
- void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer);
+ CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer);
/**
* Used by Camel to detach the existing consumer from the given stream.
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 15a10e4..1abbd94 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -276,8 +276,10 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
}
@Override
- public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
- streamSubscriber(name).attachConsumer(consumer);
+ public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
+ CamelSubscriber subscriber = streamSubscriber(name);
+ subscriber.attachConsumer(consumer);
+ return subscriber;
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index 098d0ca..4a232df 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -19,10 +19,8 @@ package org.apache.camel.component.reactive.streams.engine;
import java.io.Closeable;
import java.io.IOException;
-import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
@@ -194,4 +192,19 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
}
}
+ public long getRequested() {
+ return requested;
+ }
+
+ public long getInflightCount() {
+ return inflightCount;
+ }
+
+ public long getBufferSize() {
+ if (subscription != null && subscription instanceof CamelSubscription) {
+ return ((CamelSubscription) subscription).getBufferSize();
+ } else {
+ return 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index 16c4eea..37b6670 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -244,4 +244,8 @@ public class CamelSubscription implements Subscription {
this.backpressureStrategy = backpressureStrategy;
mutex.unlock();
}
+
+ public long getBufferSize() {
+ return buffer.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb151b0/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 995c218..3f8ed0c 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -24,6 +24,7 @@ import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.api.DispatchCallback;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@@ -87,8 +88,8 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
}
@Override
- public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
-
+ public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
+ return null;
}
@Override