You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2009/02/09 20:46:51 UTC
svn commit: r742705 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
test/java/org/apache/camel/builder/RouteBuilderTest.java
test/java/org/apache/camel/processor/ThreadTest.java
Author: gertv
Date: Mon Feb 9 19:46:50 2009
New Revision: 742705
URL: http://svn.apache.org/viewvc?rev=742705&view=rev
Log:
SM-1271: StreamCachingInterceptor should support async exchange handling
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=742705&r1=742704&r2=742705&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Mon Feb 9 19:46:50 2009
@@ -18,36 +18,25 @@
import java.util.List;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.converter.stream.StreamCache;
import org.apache.camel.model.InterceptorRef;
import org.apache.camel.model.InterceptorType;
-import org.apache.camel.processor.Interceptor;
+import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.MessageHelper;
/**
- * {@link Interceptor} that converts a message into a re-readable format
+ * {@link DelegateProcessor} that converts a message into a re-readable format
*/
-public class StreamCachingInterceptor extends Interceptor {
+public class StreamCachingInterceptor extends DelegateProcessor implements AsyncProcessor {
public StreamCachingInterceptor() {
super();
- setInterceptorLogic(new Processor() {
- public void process(Exchange exchange) throws Exception {
- try {
- StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
- if (newBody != null) {
- exchange.getIn().setBody(newBody);
- }
- MessageHelper.resetStreamCache(exchange.getIn());
- } catch (NoTypeConversionAvailableException ex) {
- // ignore if in is not of StreamCache type
- }
- proceed(exchange);
- }
- });
}
public StreamCachingInterceptor(Processor processor) {
@@ -74,4 +63,37 @@
}
}
}
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ try {
+ StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
+ if (newBody != null) {
+ exchange.getIn().setBody(newBody);
+ }
+ MessageHelper.resetStreamCache(exchange.getIn());
+ } catch (NoTypeConversionAvailableException ex) {
+ // ignore if in is not of StreamCache type
+ }
+ return proceed(exchange, callback);
+ }
+
+ public boolean proceed(Exchange exchange, AsyncCallback callback) {
+ if (getProcessor() instanceof AsyncProcessor) {
+ return ((AsyncProcessor) getProcessor()).process(exchange, callback);
+ } else {
+ try {
+ processor.process(exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+ // false means processing of the exchange asynchronously,
+ callback.done(true);
+ return true;
+ }
+ }
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?rev=742705&r1=742704&r2=742705&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java Mon Feb 9 19:46:50 2009
@@ -250,7 +250,7 @@
// take off the InstrumentationProcessor
processor = unwrapDelegateProcessor(processor);
// take off the StreamCacheInterceptor
- processor = unwrapInterceptor(processor);
+ processor = unwrapDelegateProcessor(processor);
MulticastProcessor multicastProcessor = assertIsInstanceOf(MulticastProcessor.class, processor);
List<Processor> endpoints = new ArrayList<Processor>(multicastProcessor.getProcessors());
assertEquals("Should have 2 endpoints", 2, endpoints.size());
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=742705&r1=742704&r2=742705&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Mon Feb 9 19:46:50 2009
@@ -145,9 +145,6 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- //TODO: revert this once we get DelegateProcessor to support async
- setErrorHandlerBuilder(noErrorHandler());
-
// START SNIPPET: example
from("direct:a").thread(1).process(new Processor() {
public void process(Exchange exchange) throws Exception {