You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/04/14 16:22:43 UTC
[2/2] camel git commit: CAMEL-8634: Wire tap - Should emit event
notification about sending to tapped endpoint
CAMEL-8634: Wire tap - Should emit event notification about sending to tapped endpoint
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/22766a51
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/22766a51
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/22766a51
Branch: refs/heads/camel-2.15.x
Commit: 22766a51cabca583c3e48e88731bea58823ae0a0
Parents: d408674
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 14 16:25:08 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 14 16:25:33 2015 +0200
----------------------------------------------------------------------
.../camel/processor/WireTapProcessor.java | 12 +++++--
.../EventNotifierExchangeSentParallelTest.java | 2 ++
.../EventNotifierExchangeSentTest.java | 38 ++++++++++++++++++++
3 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/22766a51/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 3f71226..9447843 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -36,9 +36,11 @@ import org.apache.camel.Traceable;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.EventHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +92,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
AsyncProcessorHelper.process(this, exchange);
}
- public boolean process(Exchange exchange, final AsyncCallback callback) {
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
if (!isStarted()) {
throw new IllegalStateException("WireTapProcessor has not been started: " + this);
}
@@ -110,14 +112,20 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
// send the exchange to the destination using an executor service
executorService.submit(new Callable<Exchange>() {
public Exchange call() throws Exception {
+ final StopWatch watch = new StopWatch();
try {
+ EventHelper.notifyExchangeSending(wireTapExchange.getContext(), wireTapExchange, destination);
LOG.debug(">>>> (wiretap) {} {}", destination, wireTapExchange);
processor.process(wireTapExchange);
} catch (Throwable e) {
LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + destination + ". This exception will be ignored.", e);
+ } finally {
+ // emit event that the exchange was sent to the endpoint
+ long timeTaken = watch.stop();
+ EventHelper.notifyExchangeSent(wireTapExchange.getContext(), wireTapExchange, destination, timeTaken);
}
return wireTapExchange;
- };
+ }
});
// continue routing this synchronously
http://git-wip-us.apache.org/repos/asf/camel/blob/22766a51/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java
index a84ec1a..6131bda 100644
--- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java
@@ -67,6 +67,8 @@ public class EventNotifierExchangeSentParallelTest extends EventNotifierExchange
from("direct:foo").recipientList(header("foo")).parallelProcessing();
from("direct:cool").delay(1000);
+
+ from("direct:tap").wireTap("log:foo").to("mock:result");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/22766a51/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java
index 888c2f7..8428258 100644
--- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java
@@ -145,6 +145,42 @@ public class EventNotifierExchangeSentTest extends ContextTestSupport {
assertEquals("direct://foo", e11.getEndpoint().getEndpointUri());
}
+ public void testExchangeWireTap() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:tap", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // give it time to complete
+ Thread.sleep(200);
+
+ assertEquals(6, events.size());
+
+ // we should find log:foo which we tapped
+ // which runs async so they can be in random order
+ boolean found = false;
+ boolean found2 = false;
+ for (EventObject event : events) {
+ if (event instanceof ExchangeSendingEvent) {
+ ExchangeSendingEvent sending = (ExchangeSendingEvent) event;
+ String uri = sending.getEndpoint().getEndpointUri();
+ if ("log://foo".equals(uri)) {
+ found = true;
+ }
+ } else if (event instanceof ExchangeSentEvent) {
+ ExchangeSentEvent sent = (ExchangeSentEvent) event;
+ String uri = sent.getEndpoint().getEndpointUri();
+ if ("log://foo".equals(uri)) {
+ found2 = true;
+ }
+ }
+ }
+
+ assertTrue("We should find log:foo being wire tapped", found);
+ assertTrue("We should find log:foo being wire tapped", found2);
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -157,6 +193,8 @@ public class EventNotifierExchangeSentTest extends ContextTestSupport {
from("direct:foo").recipientList().header("foo");
from("direct:cool").delay(1000);
+
+ from("direct:tap").wireTap("log:foo").to("mock:result");
}
};
}