You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2009/02/03 10:16:55 UTC

svn commit: r740251 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/builder/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/interceptor/ main/java/org/apache/camel/util/ test/java/org/apache/camel/component/data...

Author: gertv
Date: Tue Feb  3 09:16:53 2009
New Revision: 740251

URL: http://svn.apache.org/viewvc?rev=740251&view=rev
Log:
CAMEL-1271/CAMEL-520: Enable stream caching as an InterceptStrategy

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Tue Feb  3 09:16:53 2009
@@ -25,6 +25,7 @@
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.RedeliveryPolicy;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.processor.interceptor.StreamCaching;
 import org.apache.camel.spi.RouteContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -66,6 +67,7 @@
     public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
         Processor deadLetter = getDeadLetterFactory().createProcessor();
         DeadLetterChannel answer = new DeadLetterChannel(processor, deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
+        StreamCaching.enable(routeContext);
         configure(answer);
         return answer;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Feb  3 09:16:53 2009
@@ -31,6 +31,7 @@
 import org.apache.camel.model.LoggingLevel;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -175,6 +176,9 @@
                     exchange.setException(null);
                 }
 
+                // reset cached streams so they can be read again
+                MessageHelper.resetStreamCache(exchange.getIn());
+
                 // wait until we should redeliver
                 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
 

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java?rev=740251&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java Tue Feb  3 09:16:53 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.interceptor;
+
+import org.apache.camel.Processor;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * {@link InterceptStrategy} implementation to configure stream caching on a RouteContext
+ */
+public class StreamCaching implements InterceptStrategy {
+    
+    /*
+     * Hide constructor -- instances will be created through static enable() methods
+     */
+    private StreamCaching() {
+        super();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @SuppressWarnings("unchecked")
+    public Processor wrapProcessorInInterceptors(ProcessorType processorType, Processor target) throws Exception {
+        return new StreamCachingInterceptor(target);
+    }
+    
+    /**
+     * Enable stream caching for a RouteContext
+     * 
+     * @param context the route context
+     */
+    public static void enable(RouteContext context) {
+        for (InterceptStrategy strategy : context.getInterceptStrategies()) {
+            if (strategy instanceof StreamCaching) {
+                return;
+            }
+        }
+        context.addInterceptStrategy(new StreamCaching());
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Tue Feb  3 09:16:53 2009
@@ -25,6 +25,7 @@
 import org.apache.camel.model.InterceptorRef;
 import org.apache.camel.model.InterceptorType;
 import org.apache.camel.processor.Interceptor;
+import org.apache.camel.util.MessageHelper;
 
 /**
  * {@link Interceptor} that converts a message into a re-readable format
@@ -38,9 +39,9 @@
                 try {
                     StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
                     if (newBody != null) {
-                        newBody.reset();
                         exchange.getIn().setBody(newBody);
                     }
+                    MessageHelper.resetStreamCache(exchange.getIn());
                 } catch (NoTypeConversionAvailableException ex) {
                     // ignore if in is not of StreamCache type
                 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java Tue Feb  3 09:16:53 2009
@@ -90,4 +90,19 @@
         }
         return answer;
     }
+    
+    /**
+     * If the message body contains a {@link StreamCache} instance, reset the cache to 
+     * enable reading from it again.
+     * 
+     * @param message the message for which to reset the body
+     */
+    public static void resetStreamCache(Message message) {
+        if (message == null) {
+            return;
+        }
+        if (message.getBody() instanceof StreamCache) {
+            ((StreamCache) message.getBody()).reset();
+        }
+    }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java Tue Feb  3 09:16:53 2009
@@ -46,6 +46,9 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
+                //TODO: remove this once the delegate processor supports async
+                errorHandler(noErrorHandler());
+                
                 from("dataset:foo").to("seda:queue:test?size=100");
                 from("seda:queue:test?size=100").to("dataset:foo");
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java Tue Feb  3 09:16:53 2009
@@ -19,6 +19,7 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.io.Reader;
 import java.io.StringReader;
 
 import javax.xml.transform.stream.StreamSource;
@@ -68,23 +69,22 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                streamCaching();
                 errorHandler(deadLetterChannel("direct:errorHandler").maximumRedeliveries(3));
                 from("direct:start").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         count++;
                         // Read the in stream from cache
                         String result = exchange.getIn().getBody(String.class);
-                        assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>");
+                        assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result);
                         throw new Exception("Forced exception by unit test");
                     }
                 });
 
                 //Need to set the streamCaching for the deadLetterChannel
-                from("direct:errorHandler").streamCaching().process(new Processor() {
+                from("direct:errorHandler").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         String result = exchange.getIn().getBody(String.class);
-                        assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>");
+                        assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result);
                     }
                 }).to("mock:error");
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Tue Feb  3 09:16:53 2009
@@ -104,8 +104,12 @@
             outputProcessor = interceptor.getProcessor();
         }
 
+        // we are not interested in any other delegate processors in the route (e.g. stream caching)
+        while (outputProcessor instanceof DelegateProcessor) {
+            outputProcessor = ((DelegateProcessor) outputProcessor).getProcessor();
+        }
+
         assertIsInstanceOf(StreamResequencer.class, outputProcessor);
     }
-
 }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java Tue Feb  3 09:16:53 2009
@@ -37,7 +37,7 @@
     public void testSendStreamSource() throws Exception {
         x.expectedMessageCount(1);
         y.expectedMessageCount(1);
-     
+        
         sendBody("direct:start", new StreamSource(new StringReader("<message>xx</message>")));
         sendBody("direct:start", new StreamSource(new StringReader("<message>yy</message>")));
         
@@ -65,7 +65,7 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:start").convertBodyTo(String.class).choice()
+                from("direct:start").choice()
                   .when().xpath("/message/text() = 'xx'").to("mock:x")
                   .when().xpath("/message/text() = 'yy'").to("mock:y");
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Tue Feb  3 09:16:53 2009
@@ -20,12 +20,16 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.spi.InterceptStrategy;
 
 /**
  * @version $Revision$
@@ -141,7 +145,8 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                inheritErrorHandler(false);
+                //TODO: revert this once we get DelegateProcessor to support async
+                setErrorHandlerBuilder(noErrorHandler());
 
                 // START SNIPPET: example
                 from("direct:a").thread(1).process(new Processor() {

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java?rev=740251&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java Tue Feb  3 09:16:53 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.interceptor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.impl.DefaultRouteContext;
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * Test cases for {@link StreamCaching}
+ */
+public class StreamCachingTest extends ContextTestSupport {
+
+    /**
+     * Tests enabling stream caching on a {@link RouteContext}
+     */
+    public void testEnableOnRouteContext() throws Exception {
+        RouteContext rc = new DefaultRouteContext(super.context);
+        StreamCaching.enable(rc);
+        assertStrategyEnabled("Enabling StreamCaching should add it to the intercept strategies", rc);
+        StreamCaching.enable(rc);
+        assertStrategyEnabled("Enabling it again should not add a second instance", rc);
+    }
+
+    /*
+     * Assert that the strategy is enabled exactly one time
+     */
+    private void assertStrategyEnabled(String message, RouteContext rc) {
+        int count = 0;
+        for (InterceptStrategy strategy : rc.getInterceptStrategies()) {
+            if (strategy instanceof StreamCaching) {
+                count++;
+            }
+        }
+        assertEquals(message, 1, count);
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java?rev=740251&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java Tue Feb  3 09:16:53 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import junit.framework.TestCase;
+
+import org.apache.camel.Message;
+import org.apache.camel.converter.stream.StreamCache;
+import org.apache.camel.impl.DefaultMessage;
+
+/**
+ * Test cases for {@link MessageHelper}
+ */
+public class MessageHelperTest extends TestCase {
+    
+    private Message message;
+    
+    @Override
+    protected void setUp() throws Exception {
+        message = new DefaultMessage();
+    }
+
+    /*
+     * Tests the {@link MessageHelper#resetStreamCache(Message)} method
+     */
+    public void testResetStreamCache() throws Exception {
+        // should not throw exceptions when Message or message body is null
+        MessageHelper.resetStreamCache((Message) null);
+        MessageHelper.resetStreamCache(message);
+        
+        // handle StreamCache
+        final ValueHolder<Boolean> reset = new ValueHolder<Boolean>(Boolean.FALSE);
+        message.setBody(new StreamCache() {
+            public void reset() {
+                reset.set(Boolean.TRUE);
+            }
+        });
+        MessageHelper.resetStreamCache(message);
+        assertTrue("Should have reset the stream cache", reset.get());
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
------------------------------------------------------------------------------
    svn:eol-style = native