You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/25 17:41:09 UTC
svn commit: r958008 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/async/
Author: davsclaus
Date: Fri Jun 25 15:41:09 2010
New Revision: 958008
URL: http://svn.apache.org/viewvc?rev=958008&view=rev
Log:
CAMEL-2836: Using a bridge to adapt custom interceptors to the async routing engine to let async still be possible.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomAsyncInterceptorTest.java
- copied, changed from r957928, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=958008&r1=958007&r2=958008&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Fri Jun 25 15:41:09 2010
@@ -178,13 +178,22 @@ public class DefaultChannel extends Serv
if (strategy instanceof Tracer) {
continue;
}
- target = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, target, next);
- if (!(target instanceof AsyncProcessor)) {
- // warn if interceptor is not async compatible
+ Processor wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, target, next);
+ if (!(wrapped instanceof AsyncProcessor)) {
LOG.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance."
+ " This causes the asynchronous routing engine to not work as optimal as possible."
- + " See more details at the InterceptStrategy javadoc.");
+ + " See more details at the InterceptStrategy javadoc."
+ + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine,"
+ + " but its not the most optimal solution. Please consider changing your interceptor to comply.");
+
+ // use a bridge and wrap again which allows us to adapt and leverage the asynchronous routing engine anyway
+ // however its not the most optimal solution, but we can still run.
+ InterceptorToAsyncProcessorBridge bridge = new InterceptorToAsyncProcessorBridge(target);
+ wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, bridge, next);
+ bridge.setTarget(wrapped);
+ wrapped = bridge;
}
+ target = wrapped;
}
// sets the delegate to our wrapped output
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java?rev=958008&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java Fri Jun 25 15:41:09 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+
+/**
+ * A bridge to have regular interceptors implemented as {@link org.apache.camel.Processor}
+ * work with the asynchronous routing engine without causing side effects.
+ *
+ * @version $Revision$
+ */
+public class InterceptorToAsyncProcessorBridge extends ServiceSupport implements AsyncProcessor {
+
+ private final AsyncProcessor interceptor;
+ private volatile AsyncProcessor target;
+ private volatile ThreadLocal<AsyncCallback> callback = new ThreadLocal<AsyncCallback>();
+ private volatile ThreadLocal<Boolean> interceptorDone = new ThreadLocal<Boolean>();
+
+ /**
+ * Constructs the bridge
+ *
+ * @param interceptor the interceptor to bridge
+ */
+ public InterceptorToAsyncProcessorBridge(Processor interceptor) {
+ this.interceptor = AsyncProcessorTypeConverter.convert(interceptor);
+ }
+
+ /**
+ * Process invoked by the interceptor
+ * @param exchange the message exchange
+ * @throws Exception
+ */
+ public void process(Exchange exchange) throws Exception {
+ // invoke when interceptor wants to invoke
+ boolean done = interceptor.process(exchange, callback.get());
+ interceptorDone.set(done);
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ // remember the callback to be used by the interceptor
+ this.callback.set(callback);
+ try {
+ // invoke the target
+ boolean done = target.process(exchange, callback);
+ if (interceptorDone.get() != null) {
+ // return the result from the interceptor if it was invoked
+ return interceptorDone.get();
+ } else {
+ // otherwise from the target
+ return done;
+ }
+ } finally {
+ // cleanup
+ this.callback.remove();
+ this.interceptorDone.remove();
+ }
+ }
+
+ public void setTarget(Processor target) {
+ this.target = AsyncProcessorTypeConverter.convert(target);
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncBridge[" + interceptor.toString() + "]";
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // noop
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ callback.remove();
+ interceptorDone.remove();
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomAsyncInterceptorTest.java (from r957928, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomAsyncInterceptorTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomAsyncInterceptorTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java&r1=957928&r2=958008&rev=958008&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomAsyncInterceptorTest.java Fri Jun 25 15:41:09 2010
@@ -31,7 +31,7 @@ import org.apache.camel.spi.InterceptStr
/**
* @version $Revision$
*/
-public class AsyncEndpointCustomInterceptorTest extends ContextTestSupport {
+public class AsyncEndpointCustomAsyncInterceptorTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java?rev=958008&r1=958007&r2=958008&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java Fri Jun 25 15:41:09 2010
@@ -18,17 +18,20 @@ package org.apache.camel.processor.async
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.ProcessorDefinition;
-import org.apache.camel.processor.DelegateAsyncProcessor;
import org.apache.camel.spi.InterceptStrategy;
/**
+ * Using a custom interceptor which is not a {@link org.apache.camel.AsyncProcessor} which Camel
+ * detects and uses a bridge to adapt to so the asynchronous engine can still run. Albeit not
+ * the most optimal solution but it runs. Camel will log a WARN so user can see the issue
+ * and change his interceptor to comply.
+ *
* @version $Revision$
*/
public class AsyncEndpointCustomInterceptorTest extends ContextTestSupport {
@@ -88,18 +91,13 @@ public class AsyncEndpointCustomIntercep
public Processor wrapProcessorInInterceptors(final CamelContext context, final ProcessorDefinition<?> definition,
final Processor target, final Processor nextTarget) throws Exception {
- // use DelegateAsyncProcessor to ensure the interceptor works well with the asynchronous routing
- // engine in Camel.
- // The target is the processor to continue routing to, which we must provide
- // in the constructor of the DelegateAsyncProcessor
- return new DelegateAsyncProcessor(target) {
- @Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
+ return new Processor() {
+ public void process(Exchange exchange) throws Exception {
// we just want to count number of interceptions
counter.incrementAndGet();
- // invoke super to continue routing the message
- return super.process(exchange, callback);
+ // and continue processing the exchange
+ target.process(exchange);
}
};
}