You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/04/14 16:22:43 UTC

[2/2] camel git commit: CAMEL-8634: Wire tap - Should emit event notification about sending to tapped endpoint

CAMEL-8634: Wire tap - Should emit event notification about sending to tapped endpoint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/22766a51
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/22766a51
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/22766a51

Branch: refs/heads/camel-2.15.x
Commit: 22766a51cabca583c3e48e88731bea58823ae0a0
Parents: d408674
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 14 16:25:08 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 14 16:25:33 2015 +0200

----------------------------------------------------------------------
 .../camel/processor/WireTapProcessor.java       | 12 +++++--
 .../EventNotifierExchangeSentParallelTest.java  |  2 ++
 .../EventNotifierExchangeSentTest.java          | 38 ++++++++++++++++++++
 3 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/22766a51/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 3f71226..9447843 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -36,9 +36,11 @@ import org.apache.camel.Traceable;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,7 +92,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
         AsyncProcessorHelper.process(this, exchange);
     }
 
-    public boolean process(Exchange exchange, final AsyncCallback callback) {
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
         if (!isStarted()) {
             throw new IllegalStateException("WireTapProcessor has not been started: " + this);
         }
@@ -110,14 +112,20 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
         // send the exchange to the destination using an executor service
         executorService.submit(new Callable<Exchange>() {
             public Exchange call() throws Exception {
+                final StopWatch watch = new StopWatch();
                 try {
+                    EventHelper.notifyExchangeSending(wireTapExchange.getContext(), wireTapExchange, destination);
                     LOG.debug(">>>> (wiretap) {} {}", destination, wireTapExchange);
                     processor.process(wireTapExchange);
                 } catch (Throwable e) {
                     LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + destination + ". This exception will be ignored.", e);
+                } finally {
+                    // emit event that the exchange was sent to the endpoint
+                    long timeTaken = watch.stop();
+                    EventHelper.notifyExchangeSent(wireTapExchange.getContext(), wireTapExchange, destination, timeTaken);
                 }
                 return wireTapExchange;
-            };
+            }
         });
 
         // continue routing this synchronously

http://git-wip-us.apache.org/repos/asf/camel/blob/22766a51/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java
index a84ec1a..6131bda 100644
--- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java
@@ -67,6 +67,8 @@ public class EventNotifierExchangeSentParallelTest extends EventNotifierExchange
                 from("direct:foo").recipientList(header("foo")).parallelProcessing();
 
                 from("direct:cool").delay(1000);
+
+                from("direct:tap").wireTap("log:foo").to("mock:result");
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/22766a51/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java
index 888c2f7..8428258 100644
--- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java
@@ -145,6 +145,42 @@ public class EventNotifierExchangeSentTest extends ContextTestSupport {
         assertEquals("direct://foo", e11.getEndpoint().getEndpointUri());
     }
 
+    public void testExchangeWireTap() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:tap", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // give it time to complete
+        Thread.sleep(200);
+
+        assertEquals(6, events.size());
+
+        // we should find log:foo which we tapped
+        // which runs async so they can be in random order
+        boolean found = false;
+        boolean found2 = false;
+        for (EventObject event : events) {
+            if (event instanceof ExchangeSendingEvent) {
+                ExchangeSendingEvent sending = (ExchangeSendingEvent) event;
+                String uri = sending.getEndpoint().getEndpointUri();
+                if ("log://foo".equals(uri))  {
+                    found = true;
+                }
+            } else if (event instanceof ExchangeSentEvent) {
+                ExchangeSentEvent sent = (ExchangeSentEvent) event;
+                String uri = sent.getEndpoint().getEndpointUri();
+                if ("log://foo".equals(uri))  {
+                    found2 = true;
+                }
+            }
+        }
+
+        assertTrue("We should find log:foo being wire tapped", found);
+        assertTrue("We should find log:foo being wire tapped", found2);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -157,6 +193,8 @@ public class EventNotifierExchangeSentTest extends ContextTestSupport {
                 from("direct:foo").recipientList().header("foo");
 
                 from("direct:cool").delay(1000);
+
+                from("direct:tap").wireTap("log:foo").to("mock:result");
             }
         };
     }