You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/18 09:42:56 UTC

svn commit: r955873 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/spi/ test/java/org/apache/camel/processor/async/

Author: davsclaus
Date: Fri Jun 18 07:42:56 2010
New Revision: 955873

URL: http://svn.apache.org/viewvc?rev=955873&view=rev
Log:
CAMEL-2723: Warn if custom interceptor is not compatible with the async routing engine.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java   (contents, props changed)
      - copied, changed from r955852, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Removed:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Intercept.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=955873&r1=955872&r2=955873&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Fri Jun 18 07:42:56 2010
@@ -179,6 +179,12 @@ public class DefaultChannel extends Serv
                 continue;
             }
             target = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, target, next);
+            if (!(target instanceof AsyncProcessor)) {
+                // warn if interceptor is not async compatible
+                LOG.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance."
+                    + " This causes the asynchronous routing engine to not work as optimal as possible."
+                    + " See more details at the InterceptStrategy javadoc.");
+            }
         }
 
         // sets the delegate to our wrapped output

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java?rev=955873&r1=955872&r2=955873&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java Fri Jun 18 07:42:56 2010
@@ -24,21 +24,26 @@ import org.apache.camel.model.ProcessorD
  * The purpose of this interface is to allow an implementation to wrap
  * processors in a route with interceptors.  For example, a possible
  * usecase is to gather performance statistics at the processor's level.
+ * <p/>
+ * Its <b>strongly</b> adviced to use an {@link org.apache.camel.AsyncProcessor} as the returned wrapped
+ * {@link Processor} which ensures the interceptor works well with the asynchronous routing engine.
+ * You can use the {@link org.apache.camel.processor.DelegateAsyncProcessor} to easily return an
+ * {@link org.apache.camel.AsyncProcessor} and override the
+ * {@link org.apache.camel.AsyncProcessor#process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} to
+ * implement your interceptor logic. And just invoke the super method to <b>continue</b> routing.
  *
  * @version $Revision$
  */
 public interface InterceptStrategy {
 
-    // TODO: We should force this strategy to return AsyncProcessor so custom interceptors work nicely with async
-
     /**
      * This method is invoked by
      * {@link ProcessorDefinition#wrapProcessor(RouteContext, Processor)}
      * to give the implementor an opportunity to wrap the target processor
      * in a route.
      * <p/>
-     * Its adviced to use an {@link org.apache.camel.AsyncProcessor} as the returned wrapped
-     * {@link Processor} which ensures the interceptor works well with the asynchronous routing engine.
+     * <b>Important:</b> See the class javadoc for advice on letting interceptor be compatible with the
+     * asynchronous routing engine.
      *
      * @param context       Camel context
      * @param definition    the model this interceptor represents

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java (from r955852, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=955852&r2=955873&rev=955873&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java Fri Jun 18 07:42:56 2010
@@ -16,18 +16,26 @@
  */
 package org.apache.camel.processor.async;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.spi.InterceptStrategy;
 
 /**
  * @version $Revision$
  */
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointCustomInterceptorTest extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
+    private MyInterceptor interceptor = new MyInterceptor();
 
     public void testAsyncEndpoint() throws Exception {
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -39,6 +47,8 @@ public class AsyncEndpointTest extends C
 
         assertMockEndpointsSatisfied();
 
+        assertEquals(8, interceptor.getCounter());
+
         assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
@@ -48,6 +58,7 @@ public class AsyncEndpointTest extends C
             @Override
             public void configure() throws Exception {
                 context.addComponent("async", new MyAsyncComponent());
+                context.addInterceptStrategy(interceptor);
 
                 from("direct:start")
                         .to("mock:before")
@@ -70,4 +81,33 @@ public class AsyncEndpointTest extends C
         };
     }
 
-}
+    // START SNIPPET: e1
+    private class MyInterceptor implements InterceptStrategy {
+        private AtomicInteger counter = new AtomicInteger();
+
+        public Processor wrapProcessorInInterceptors(final CamelContext context, final ProcessorDefinition<?> definition,
+                                                     final Processor target, final Processor nextTarget) throws Exception {
+
+            // use DelegateAsyncProcessor to ensure the interceptor works well with the asynchronous routing
+            // engine in Camel.
+            // The target is the processor to continue routing to, which we must provide
+            // in the constructor of the DelegateAsyncProcessor
+            return new DelegateAsyncProcessor(target) {
+                @Override
+                public boolean process(Exchange exchange, AsyncCallback callback) {
+                    // we just want to count number of interceptions
+                    counter.incrementAndGet();
+
+                    // invoke super to continue routing the message
+                    return super.process(exchange, callback);
+                }
+            };
+        }
+
+        public int getCounter() {
+            return counter.get();
+        }
+    }
+    // END SNIPPET: e1
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date