You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2009/02/09 20:46:51 UTC

svn commit: r742705 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java test/java/org/apache/camel/builder/RouteBuilderTest.java test/java/org/apache/camel/processor/ThreadTest.java

Author: gertv
Date: Mon Feb  9 19:46:50 2009
New Revision: 742705

URL: http://svn.apache.org/viewvc?rev=742705&view=rev
Log:
SM-1271: StreamCachingInterceptor should support async exchange handling

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=742705&r1=742704&r2=742705&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Mon Feb  9 19:46:50 2009
@@ -18,36 +18,25 @@
 
 import java.util.List;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.converter.stream.StreamCache;
 import org.apache.camel.model.InterceptorRef;
 import org.apache.camel.model.InterceptorType;
-import org.apache.camel.processor.Interceptor;
+import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.MessageHelper;
 
 /**
- * {@link Interceptor} that converts a message into a re-readable format
+ * {@link DelegateProcessor} that converts a message into a re-readable format
  */
-public class StreamCachingInterceptor extends Interceptor {
+public class StreamCachingInterceptor extends DelegateProcessor implements AsyncProcessor {
 
     public StreamCachingInterceptor() {
         super();
-        setInterceptorLogic(new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                try {
-                    StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
-                    if (newBody != null) {
-                        exchange.getIn().setBody(newBody);
-                    }
-                    MessageHelper.resetStreamCache(exchange.getIn());
-                } catch (NoTypeConversionAvailableException ex) {
-                    // ignore if in is not of StreamCache type
-                }
-                proceed(exchange);
-            }
-        });
     }
 
     public StreamCachingInterceptor(Processor processor) {
@@ -74,4 +63,37 @@
             }
         }
     }
+    
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
+            if (newBody != null) {
+                exchange.getIn().setBody(newBody);
+            }
+            MessageHelper.resetStreamCache(exchange.getIn());
+        } catch (NoTypeConversionAvailableException ex) {
+            // ignore if in is not of StreamCache type
+        }
+        return proceed(exchange, callback);
+    } 
+
+    public boolean proceed(Exchange exchange, AsyncCallback callback) {
+        if (getProcessor() instanceof AsyncProcessor) {
+            return ((AsyncProcessor) getProcessor()).process(exchange, callback);
+        } else {
+            try {
+                processor.process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+            }
+            // false means processing of the exchange asynchronously,
+            callback.done(true);
+            return true;
+        }
+    }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?rev=742705&r1=742704&r2=742705&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java Mon Feb  9 19:46:50 2009
@@ -250,7 +250,7 @@
             // take off the InstrumentationProcessor
             processor = unwrapDelegateProcessor(processor);
             // take off the StreamCacheInterceptor
-            processor = unwrapInterceptor(processor);
+            processor = unwrapDelegateProcessor(processor);
             MulticastProcessor multicastProcessor = assertIsInstanceOf(MulticastProcessor.class, processor);
             List<Processor> endpoints = new ArrayList<Processor>(multicastProcessor.getProcessors());
             assertEquals("Should have 2 endpoints", 2, endpoints.size());

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=742705&r1=742704&r2=742705&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Mon Feb  9 19:46:50 2009
@@ -145,9 +145,6 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                //TODO: revert this once we get DelegateProcessor to support async
-                setErrorHandlerBuilder(noErrorHandler());
-
                 // START SNIPPET: example
                 from("direct:a").thread(1).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {