You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/25 17:41:09 UTC

svn commit: r958008 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/async/

Author: davsclaus
Date: Fri Jun 25 15:41:09 2010
New Revision: 958008

URL: http://svn.apache.org/viewvc?rev=958008&view=rev
Log:
CAMEL-2836: Using a bridge to adapt custom interceptors to the async routing engine to let async still be possible.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomAsyncInterceptorTest.java
      - copied, changed from r957928, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=958008&r1=958007&r2=958008&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Fri Jun 25 15:41:09 2010
@@ -178,13 +178,22 @@ public class DefaultChannel extends Serv
             if (strategy instanceof Tracer) {
                 continue;
             }
-            target = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, target, next);
-            if (!(target instanceof AsyncProcessor)) {
-                // warn if interceptor is not async compatible
+            Processor wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, target, next); 
+            if (!(wrapped instanceof AsyncProcessor)) {
                 LOG.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance."
                     + " This causes the asynchronous routing engine to not work as optimal as possible."
-                    + " See more details at the InterceptStrategy javadoc.");
+                    + " See more details at the InterceptStrategy javadoc."
+                    + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine,"
+                    + " but its not the most optimal solution. Please consider changing your interceptor to comply.");
+
+                // use a bridge and wrap again which allows us to adapt and leverage the asynchronous routing engine anyway
+                // however its not the most optimal solution, but we can still run.
+                InterceptorToAsyncProcessorBridge bridge = new InterceptorToAsyncProcessorBridge(target);
+                wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, bridge, next);
+                bridge.setTarget(wrapped);
+                wrapped = bridge;
             }
+            target = wrapped;
         }
 
         // sets the delegate to our wrapped output

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java?rev=958008&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java Fri Jun 25 15:41:09 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+
+/**
+ * A bridge to have regular interceptors implemented as {@link org.apache.camel.Processor}
+ * work with the asynchronous routing engine without causing side effects.
+ *
+ * @version $Revision$
+ */
+public class InterceptorToAsyncProcessorBridge extends ServiceSupport implements AsyncProcessor {
+
+    private final AsyncProcessor interceptor;
+    private volatile AsyncProcessor target;
+    private volatile ThreadLocal<AsyncCallback> callback = new ThreadLocal<AsyncCallback>();
+    private volatile ThreadLocal<Boolean> interceptorDone = new ThreadLocal<Boolean>();
+
+    /**
+     * Constructs the bridge
+     *
+     * @param interceptor the interceptor to bridge
+     */
+    public InterceptorToAsyncProcessorBridge(Processor interceptor) {
+        this.interceptor = AsyncProcessorTypeConverter.convert(interceptor);
+    }
+
+    /**
+     * Process invoked by the interceptor
+     * @param exchange the message exchange
+     * @throws Exception
+     */
+    public void process(Exchange exchange) throws Exception {
+        // invoke when interceptor wants to invoke
+        boolean done = interceptor.process(exchange, callback.get());
+        interceptorDone.set(done);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // remember the callback to be used by the interceptor
+        this.callback.set(callback);
+        try {
+            // invoke the target
+            boolean done = target.process(exchange, callback);
+            if (interceptorDone.get() != null) {
+                // return the result from the interceptor if it was invoked
+                return interceptorDone.get();
+            } else {
+                // otherwise from the target
+                return done;
+            }
+        } finally {
+            // cleanup
+            this.callback.remove();
+            this.interceptorDone.remove();
+        }
+    }
+
+    public void setTarget(Processor target) {
+        this.target = AsyncProcessorTypeConverter.convert(target);
+    }
+
+    @Override
+    public String toString() {
+        return "AsyncBridge[" + interceptor.toString() + "]";
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        callback.remove();
+        interceptorDone.remove();
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomAsyncInterceptorTest.java (from r957928, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomAsyncInterceptorTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomAsyncInterceptorTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java&r1=957928&r2=958008&rev=958008&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomAsyncInterceptorTest.java Fri Jun 25 15:41:09 2010
@@ -31,7 +31,7 @@ import org.apache.camel.spi.InterceptStr
 /**
  * @version $Revision$
  */
-public class AsyncEndpointCustomInterceptorTest extends ContextTestSupport {
+public class AsyncEndpointCustomAsyncInterceptorTest extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java?rev=958008&r1=958007&r2=958008&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java Fri Jun 25 15:41:09 2010
@@ -18,17 +18,20 @@ package org.apache.camel.processor.async
 
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.model.ProcessorDefinition;
-import org.apache.camel.processor.DelegateAsyncProcessor;
 import org.apache.camel.spi.InterceptStrategy;
 
 /**
+ * Using a custom interceptor which is not a {@link org.apache.camel.AsyncProcessor} which Camel
+ * detects and uses a bridge to adapt to so the asynchronous engine can still run. Albeit not
+ * the most optimal solution but it runs. Camel will log a WARN so user can see the issue
+ * and change his interceptor to comply.
+ *
  * @version $Revision$
  */
 public class AsyncEndpointCustomInterceptorTest extends ContextTestSupport {
@@ -88,18 +91,13 @@ public class AsyncEndpointCustomIntercep
         public Processor wrapProcessorInInterceptors(final CamelContext context, final ProcessorDefinition<?> definition,
                                                      final Processor target, final Processor nextTarget) throws Exception {
 
-            // use DelegateAsyncProcessor to ensure the interceptor works well with the asynchronous routing
-            // engine in Camel.
-            // The target is the processor to continue routing to, which we must provide
-            // in the constructor of the DelegateAsyncProcessor
-            return new DelegateAsyncProcessor(target) {
-                @Override
-                public boolean process(Exchange exchange, AsyncCallback callback) {
+            return new Processor() {
+                public void process(Exchange exchange) throws Exception {
                     // we just want to count number of interceptions
                     counter.incrementAndGet();
 
-                    // invoke super to continue routing the message
-                    return super.process(exchange, callback);
+                    // and continue processing the exchange
+                    target.process(exchange);
                 }
             };
         }