You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/05/19 16:51:26 UTC

svn commit: r1340492 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/management/event/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/async/

Author: davsclaus
Date: Sat May 19 14:51:26 2012
New Revision: 1340492

URL: http://svn.apache.org/viewvc?rev=1340492&view=rev
Log:
CAMEL-5289: Ensure ExchangeSentEvent is emitted in async callback done.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierSendingTest.java
      - copied, changed from r1340486, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/event/ExchangeSendingEvent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=1340492&r1=1340491&r2=1340492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Sat May 19 14:51:26 2012
@@ -187,7 +187,7 @@ public class ProducerCache extends Servi
     }
 
     /**
-     * Sends an exchange to an endpoint using a supplied callback
+     * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing.
      * <p/>
      * If an exception was thrown during processing, it would be set on the given Exchange
      *
@@ -196,6 +196,7 @@ public class ProducerCache extends Servi
      * @param pattern   the exchange pattern, can be <tt>null</tt>
      * @param callback  the callback
      * @return the response from the callback
+     * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback)
      */
     public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) {
         T answer = null;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/event/ExchangeSendingEvent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/event/ExchangeSendingEvent.java?rev=1340492&r1=1340491&r2=1340492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/event/ExchangeSendingEvent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/event/ExchangeSendingEvent.java Sat May 19 14:51:26 2012
@@ -48,7 +48,7 @@ public class ExchangeSendingEvent extend
 
     @Override
     public String toString() {
-        return getExchange().getExchangeId() + " exchange " + getExchange() + " is being sent to: " + endpoint;
+        return getExchange().getExchangeId() + " exchange " + getExchange() + " sending to: " + endpoint;
     }
 
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1340492&r1=1340491&r2=1340492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sat May 19 14:51:26 2012
@@ -555,15 +555,12 @@ public class MulticastProcessor extends 
 
         final Exchange exchange = pair.getExchange();
         Processor processor = pair.getProcessor();
-        Producer producer = pair.getProducer();
+        final Producer producer = pair.getProducer();
 
         TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
 
         // compute time taken if sending to another endpoint
-        StopWatch watch = null;
-        if (producer != null) {
-            watch = new StopWatch();
-        }
+        final StopWatch watch = producer != null ? new StopWatch() : null;
 
         try {
             // prepare tracing starting from a new block
@@ -582,6 +579,14 @@ public class MulticastProcessor extends 
                     // we are done with the exchange pair
                     pair.done();
 
+                    // okay we are done, so notify the exchange was sent
+                    if (producer != null) {
+                        long timeTaken = watch.stop();
+                        Endpoint endpoint = producer.getEndpoint();
+                        // emit event that the exchange was sent to the endpoint
+                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
+                    }
+
                     // we only have to handle async completion of the routing slip
                     if (doneSync) {
                         return;
@@ -676,12 +681,6 @@ public class MulticastProcessor extends 
             if (traced != null) {
                 traced.popBlock();
             }
-            if (producer != null) {
-                long timeTaken = watch.stop();
-                Endpoint endpoint = producer.getEndpoint();
-                // emit event that the exchange was sent to the endpoint
-                EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
-            }
         }
 
         return sync;
@@ -710,9 +709,9 @@ public class MulticastProcessor extends 
                 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
             }
             // let the prepared process it, remember to begin the exchange pair
-            // we invoke it synchronously as parallel async routing is too hard
             AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
             pair.begin();
+            // we invoke it synchronously as parallel async routing is too hard
             AsyncProcessorHelper.process(async, exchange);
         } finally {
             pair.done();
@@ -724,6 +723,8 @@ public class MulticastProcessor extends 
                 long timeTaken = watch.stop();
                 Endpoint endpoint = producer.getEndpoint();
                 // emit event that the exchange was sent to the endpoint
+                // this is okay to do here in the finally block, as the processing is not using the async routing engine
+                //( we invoke it synchronously as parallel async routing is too hard)
                 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
             }
         }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierSendingTest.java (from r1340486, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierSendingTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierSendingTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java&r1=1340486&r2=1340492&rev=1340492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierSendingTest.java Sat May 19 14:51:26 2012
@@ -16,25 +16,24 @@
  */
 package org.apache.camel.processor.async;
 
+import java.util.ArrayList;
 import java.util.EventObject;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.List;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.management.event.ExchangeSendingEvent;
 import org.apache.camel.management.event.ExchangeSentEvent;
 import org.apache.camel.support.EventNotifierSupport;
 
 /**
  * @version 
  */
-public class AsyncEndpointEventNotifierTest extends ContextTestSupport {
+public class AsyncEndpointEventNotifierSendingTest extends ContextTestSupport {
 
-    private final CountDownLatch latch = new CountDownLatch(1);
-    private final AtomicLong time = new AtomicLong();
+    private final List<EventObject> events = new ArrayList<EventObject>();
 
     public void testAsyncEndpointEventNotifier() throws Exception {
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -45,11 +44,16 @@ public class AsyncEndpointEventNotifierT
 
         assertMockEndpointsSatisfied();
 
-        assertTrue("Should count down", latch.await(10, TimeUnit.SECONDS));
+        assertEquals(8, events.size());
 
-        long delta = time.get();
-        log.info("ExchangeEventSent took ms: " + delta);
-        assertTrue("Should take about 250 millis sec, was: " + delta, delta > 200);
+        assertIsInstanceOf(ExchangeSendingEvent.class, events.get(0));
+        assertIsInstanceOf(ExchangeSendingEvent.class, events.get(1));
+        assertIsInstanceOf(ExchangeSentEvent.class, events.get(2));
+        assertIsInstanceOf(ExchangeSendingEvent.class, events.get(3));
+        assertIsInstanceOf(ExchangeSentEvent.class, events.get(4));
+        assertIsInstanceOf(ExchangeSendingEvent.class, events.get(5));
+        assertIsInstanceOf(ExchangeSentEvent.class, events.get(6));
+        assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
     }
 
     @Override
@@ -57,21 +61,12 @@ public class AsyncEndpointEventNotifierT
         DefaultCamelContext context = new DefaultCamelContext(createRegistry());
         context.getManagementStrategy().addEventNotifier(new EventNotifierSupport() {
             public void notify(EventObject event) throws Exception {
-                try {
-                    ExchangeSentEvent sent = (ExchangeSentEvent) event;
-                    time.set(sent.getTimeTaken());
-                } finally {
-                    latch.countDown();
-                }
+                System.out.println(event);
+                events.add(event);
             }
 
             public boolean isEnabled(EventObject event) {
-                // we only want the async endpoint
-                if (event instanceof ExchangeSentEvent) {
-                    ExchangeSentEvent sent = (ExchangeSentEvent) event;
-                    return sent.getEndpoint().getEndpointUri().startsWith("async");
-                }
-                return false;
+                return event instanceof ExchangeSendingEvent || event instanceof ExchangeSentEvent;
             }
 
             @Override