You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/18 09:42:56 UTC
svn commit: r955873 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/processor/
main/java/org/apache/camel/spi/ test/java/org/apache/camel/processor/async/
Author: davsclaus
Date: Fri Jun 18 07:42:56 2010
New Revision: 955873
URL: http://svn.apache.org/viewvc?rev=955873&view=rev
Log:
CAMEL-2723: Warn if custom interceptor is not compatible with the async routing engine.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java (contents, props changed)
- copied, changed from r955852, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Removed:
camel/trunk/camel-core/src/main/java/org/apache/camel/Intercept.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=955873&r1=955872&r2=955873&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Fri Jun 18 07:42:56 2010
@@ -179,6 +179,12 @@ public class DefaultChannel extends Serv
continue;
}
target = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, target, next);
+ if (!(target instanceof AsyncProcessor)) {
+ // warn if interceptor is not async compatible
+ LOG.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance."
+ + " This causes the asynchronous routing engine to not work as optimal as possible."
+ + " See more details at the InterceptStrategy javadoc.");
+ }
}
// sets the delegate to our wrapped output
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java?rev=955873&r1=955872&r2=955873&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java Fri Jun 18 07:42:56 2010
@@ -24,21 +24,26 @@ import org.apache.camel.model.ProcessorD
* The purpose of this interface is to allow an implementation to wrap
* processors in a route with interceptors. For example, a possible
* usecase is to gather performance statistics at the processor's level.
+ * <p/>
+ * Its <b>strongly</b> adviced to use an {@link org.apache.camel.AsyncProcessor} as the returned wrapped
+ * {@link Processor} which ensures the interceptor works well with the asynchronous routing engine.
+ * You can use the {@link org.apache.camel.processor.DelegateAsyncProcessor} to easily return an
+ * {@link org.apache.camel.AsyncProcessor} and override the
+ * {@link org.apache.camel.AsyncProcessor#process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} to
+ * implement your interceptor logic. And just invoke the super method to <b>continue</b> routing.
*
* @version $Revision$
*/
public interface InterceptStrategy {
- // TODO: We should force this strategy to return AsyncProcessor so custom interceptors work nicely with async
-
/**
* This method is invoked by
* {@link ProcessorDefinition#wrapProcessor(RouteContext, Processor)}
* to give the implementor an opportunity to wrap the target processor
* in a route.
* <p/>
- * Its adviced to use an {@link org.apache.camel.AsyncProcessor} as the returned wrapped
- * {@link Processor} which ensures the interceptor works well with the asynchronous routing engine.
+ * <b>Important:</b> See the class javadoc for advice on letting interceptor be compatible with the
+ * asynchronous routing engine.
*
* @param context Camel context
* @param definition the model this interceptor represents
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java (from r955852, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=955852&r2=955873&rev=955873&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java Fri Jun 18 07:42:56 2010
@@ -16,18 +16,26 @@
*/
package org.apache.camel.processor.async;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.spi.InterceptStrategy;
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointCustomInterceptorTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
+ private MyInterceptor interceptor = new MyInterceptor();
public void testAsyncEndpoint() throws Exception {
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -39,6 +47,8 @@ public class AsyncEndpointTest extends C
assertMockEndpointsSatisfied();
+ assertEquals(8, interceptor.getCounter());
+
assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
}
@@ -48,6 +58,7 @@ public class AsyncEndpointTest extends C
@Override
public void configure() throws Exception {
context.addComponent("async", new MyAsyncComponent());
+ context.addInterceptStrategy(interceptor);
from("direct:start")
.to("mock:before")
@@ -70,4 +81,33 @@ public class AsyncEndpointTest extends C
};
}
-}
+ // START SNIPPET: e1
+ private class MyInterceptor implements InterceptStrategy {
+ private AtomicInteger counter = new AtomicInteger();
+
+ public Processor wrapProcessorInInterceptors(final CamelContext context, final ProcessorDefinition<?> definition,
+ final Processor target, final Processor nextTarget) throws Exception {
+
+ // use DelegateAsyncProcessor to ensure the interceptor works well with the asynchronous routing
+ // engine in Camel.
+ // The target is the processor to continue routing to, which we must provide
+ // in the constructor of the DelegateAsyncProcessor
+ return new DelegateAsyncProcessor(target) {
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ // we just want to count number of interceptions
+ counter.incrementAndGet();
+
+ // invoke super to continue routing the message
+ return super.process(exchange, callback);
+ }
+ };
+ }
+
+ public int getCounter() {
+ return counter.get();
+ }
+ }
+ // END SNIPPET: e1
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date