You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/05/19 16:51:26 UTC
svn commit: r1340492 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/management/event/
main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/async/
Author: davsclaus
Date: Sat May 19 14:51:26 2012
New Revision: 1340492
URL: http://svn.apache.org/viewvc?rev=1340492&view=rev
Log:
CAMEL-5289: Ensure ExchangeSentEvent is emitted in async callback done.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierSendingTest.java
- copied, changed from r1340486, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/event/ExchangeSendingEvent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=1340492&r1=1340491&r2=1340492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Sat May 19 14:51:26 2012
@@ -187,7 +187,7 @@ public class ProducerCache extends Servi
}
/**
- * Sends an exchange to an endpoint using a supplied callback
+ * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing.
* <p/>
* If an exception was thrown during processing, it would be set on the given Exchange
*
@@ -196,6 +196,7 @@ public class ProducerCache extends Servi
* @param pattern the exchange pattern, can be <tt>null</tt>
* @param callback the callback
* @return the response from the callback
+ * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback)
*/
public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) {
T answer = null;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/event/ExchangeSendingEvent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/event/ExchangeSendingEvent.java?rev=1340492&r1=1340491&r2=1340492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/event/ExchangeSendingEvent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/event/ExchangeSendingEvent.java Sat May 19 14:51:26 2012
@@ -48,7 +48,7 @@ public class ExchangeSendingEvent extend
@Override
public String toString() {
- return getExchange().getExchangeId() + " exchange " + getExchange() + " is being sent to: " + endpoint;
+ return getExchange().getExchangeId() + " exchange " + getExchange() + " sending to: " + endpoint;
}
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1340492&r1=1340491&r2=1340492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sat May 19 14:51:26 2012
@@ -555,15 +555,12 @@ public class MulticastProcessor extends
final Exchange exchange = pair.getExchange();
Processor processor = pair.getProcessor();
- Producer producer = pair.getProducer();
+ final Producer producer = pair.getProducer();
TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
// compute time taken if sending to another endpoint
- StopWatch watch = null;
- if (producer != null) {
- watch = new StopWatch();
- }
+ final StopWatch watch = producer != null ? new StopWatch() : null;
try {
// prepare tracing starting from a new block
@@ -582,6 +579,14 @@ public class MulticastProcessor extends
// we are done with the exchange pair
pair.done();
+ // okay we are done, so notify the exchange was sent
+ if (producer != null) {
+ long timeTaken = watch.stop();
+ Endpoint endpoint = producer.getEndpoint();
+ // emit event that the exchange was sent to the endpoint
+ EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
+ }
+
// we only have to handle async completion of the routing slip
if (doneSync) {
return;
@@ -676,12 +681,6 @@ public class MulticastProcessor extends
if (traced != null) {
traced.popBlock();
}
- if (producer != null) {
- long timeTaken = watch.stop();
- Endpoint endpoint = producer.getEndpoint();
- // emit event that the exchange was sent to the endpoint
- EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
- }
}
return sync;
@@ -710,9 +709,9 @@ public class MulticastProcessor extends
EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
}
// let the prepared process it, remember to begin the exchange pair
- // we invoke it synchronously as parallel async routing is too hard
AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
pair.begin();
+ // we invoke it synchronously as parallel async routing is too hard
AsyncProcessorHelper.process(async, exchange);
} finally {
pair.done();
@@ -724,6 +723,8 @@ public class MulticastProcessor extends
long timeTaken = watch.stop();
Endpoint endpoint = producer.getEndpoint();
// emit event that the exchange was sent to the endpoint
+ // this is okay to do here in the finally block, as the processing is not using the async routing engine
+ //( we invoke it synchronously as parallel async routing is too hard)
EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
}
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierSendingTest.java (from r1340486, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierSendingTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierSendingTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java&r1=1340486&r2=1340492&rev=1340492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierSendingTest.java Sat May 19 14:51:26 2012
@@ -16,25 +16,24 @@
*/
package org.apache.camel.processor.async;
+import java.util.ArrayList;
import java.util.EventObject;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.List;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.management.event.ExchangeSendingEvent;
import org.apache.camel.management.event.ExchangeSentEvent;
import org.apache.camel.support.EventNotifierSupport;
/**
* @version
*/
-public class AsyncEndpointEventNotifierTest extends ContextTestSupport {
+public class AsyncEndpointEventNotifierSendingTest extends ContextTestSupport {
- private final CountDownLatch latch = new CountDownLatch(1);
- private final AtomicLong time = new AtomicLong();
+ private final List<EventObject> events = new ArrayList<EventObject>();
public void testAsyncEndpointEventNotifier() throws Exception {
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -45,11 +44,16 @@ public class AsyncEndpointEventNotifierT
assertMockEndpointsSatisfied();
- assertTrue("Should count down", latch.await(10, TimeUnit.SECONDS));
+ assertEquals(8, events.size());
- long delta = time.get();
- log.info("ExchangeEventSent took ms: " + delta);
- assertTrue("Should take about 250 millis sec, was: " + delta, delta > 200);
+ assertIsInstanceOf(ExchangeSendingEvent.class, events.get(0));
+ assertIsInstanceOf(ExchangeSendingEvent.class, events.get(1));
+ assertIsInstanceOf(ExchangeSentEvent.class, events.get(2));
+ assertIsInstanceOf(ExchangeSendingEvent.class, events.get(3));
+ assertIsInstanceOf(ExchangeSentEvent.class, events.get(4));
+ assertIsInstanceOf(ExchangeSendingEvent.class, events.get(5));
+ assertIsInstanceOf(ExchangeSentEvent.class, events.get(6));
+ assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
}
@Override
@@ -57,21 +61,12 @@ public class AsyncEndpointEventNotifierT
DefaultCamelContext context = new DefaultCamelContext(createRegistry());
context.getManagementStrategy().addEventNotifier(new EventNotifierSupport() {
public void notify(EventObject event) throws Exception {
- try {
- ExchangeSentEvent sent = (ExchangeSentEvent) event;
- time.set(sent.getTimeTaken());
- } finally {
- latch.countDown();
- }
+ System.out.println(event);
+ events.add(event);
}
public boolean isEnabled(EventObject event) {
- // we only want the async endpoint
- if (event instanceof ExchangeSentEvent) {
- ExchangeSentEvent sent = (ExchangeSentEvent) event;
- return sent.getEndpoint().getEndpointUri().startsWith("async");
- }
- return false;
+ return event instanceof ExchangeSendingEvent || event instanceof ExchangeSentEvent;
}
@Override