You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2009/02/03 11:15:36 UTC

svn commit: r740263 - in /camel/branches/camel-1.x: ./ camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/processor/interceptor/ camel-core/src/main/java/org...

Author: gertv
Date: Tue Feb  3 10:15:34 2009
New Revision: 740263

URL: http://svn.apache.org/viewvc?rev=740263&view=rev
Log:
Merged revisions 740251 via svnmerge from 
https://svn.eu.apache.org/repos/asf/camel/trunk

........
  r740251 | gertv | 2009-02-03 10:16:53 +0100 (Tue, 03 Feb 2009) | 1 line
  
  CAMEL-1271/CAMEL-520: Enable stream caching as an InterceptStrategy
........

Added:
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
      - copied, changed from r740251, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java   (with props)
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java
      - copied unchanged from r740251, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
      - copied unchanged from r740251, camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
Modified:
    camel/branches/camel-1.x/   (props changed)
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java

Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  3 10:15:34 2009
@@ -1 +1 @@
-/camel/trunk:739733,739904
+/camel/trunk:739733,739904,740251

Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Tue Feb  3 10:15:34 2009
@@ -25,6 +25,7 @@
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.RedeliveryPolicy;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.processor.interceptor.StreamCaching;
 import org.apache.camel.spi.RouteContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -66,6 +67,7 @@
     public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
         Processor deadLetter = getDeadLetterFactory().createProcessor();
         DeadLetterChannel answer = new DeadLetterChannel(processor, deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
+        StreamCaching.enable(routeContext);
         configure(answer);
         return answer;
     }

Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Feb  3 10:15:34 2009
@@ -31,6 +31,7 @@
 import org.apache.camel.model.LoggingLevel;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -174,6 +175,9 @@
                     exchange.setException(null);
                 }
 
+                // reset cached streams so they can be read again
+                MessageHelper.resetStreamCache(exchange.getIn());
+
                 // wait until we should redeliver
                 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
 

Copied: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (from r740251, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java)
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java?p2=camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java&r1=740251&r2=740263&rev=740263&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java Tue Feb  3 10:15:34 2009
@@ -24,7 +24,7 @@
 /**
  * {@link InterceptStrategy} implementation to configure stream caching on a RouteContext
  */
-public class StreamCaching implements InterceptStrategy {
+public final class StreamCaching implements InterceptStrategy {
     
     /*
      * Hide constructor -- instances will be created through static enable() methods

Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Tue Feb  3 10:15:34 2009
@@ -25,6 +25,7 @@
 import org.apache.camel.model.InterceptorRef;
 import org.apache.camel.model.InterceptorType;
 import org.apache.camel.processor.Interceptor;
+import org.apache.camel.util.MessageHelper;
 
 /**
  * {@link Interceptor} that converts a message into a re-readable format
@@ -38,9 +39,9 @@
                 try {
                     StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
                     if (newBody != null) {
-                        newBody.reset();
                         exchange.getIn().setBody(newBody);
                     }
+                    MessageHelper.resetStreamCache(exchange.getIn());
                 } catch (NoTypeConversionAvailableException ex) {
                     // ignore if in is not of StreamCache type
                 }

Added: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java?rev=740263&view=auto
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java (added)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java Tue Feb  3 10:15:34 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.converter.stream.StreamCache;
+
+/**
+ * Some helper methods when working with {@link org.apache.camel.Message}.
+ *
+ * @version $Revision: 740251 $
+ */
+public final class MessageHelper {
+
+    /**
+     * Utility classes should not have a public constructor.
+     */
+    private MessageHelper() {
+    }
+
+    /**
+     * Extracts the given body and returns it as a String, that
+     * can be used for logging etc.
+     * <p/>
+     * Will handle stream based bodies wrapped in StreamCache.
+     *
+     * @param message  the message with the body
+     * @return the body as String, can return <tt>null</null> if no body
+     */
+    public static String extractBodyAsString(Message message) {
+        if (message == null) {
+            return null;
+        }
+
+        StreamCache newBody = null;
+        try {
+            newBody = message.getBody(StreamCache.class);
+            if (newBody != null) {
+                message.setBody(newBody);
+            }
+        } catch (NoTypeConversionAvailableException ex) {
+            // ignore, in not of StreamCache type
+        }
+
+        Object answer;
+        try {
+            answer = message.getBody(String.class);
+        } catch (NoTypeConversionAvailableException ex) {
+            answer = message.getBody();
+        }
+
+        if (newBody != null) {
+            // Reset the InputStreamCache
+            newBody.reset();
+        }
+
+        return answer != null ? answer.toString() : null;
+    }
+
+    /**
+     * Gets the given body class type name as a String.
+     * <p/>
+     * Will skip java.lang. for the build in Java types.
+     *
+     * @param message  the message with the body
+     * @return the body typename as String, can return <tt>null</null> if no body
+     */
+    public static String getBodyTypeName(Message message) {
+        if (message == null) {
+            return null;
+        }
+        String answer = ObjectHelper.classCanonicalName(message.getBody());
+        if (answer != null && answer.startsWith("java.lang.")) {
+            return answer.substring(10);
+        }
+        return answer;
+    }
+    
+    /**
+     * If the message body contains a {@link StreamCache} instance, reset the cache to 
+     * enable reading from it again.
+     * 
+     * @param message the message for which to reset the body
+     */
+    public static void resetStreamCache(Message message) {
+        if (message == null) {
+            return;
+        }
+        if (message.getBody() instanceof StreamCache) {
+            ((StreamCache) message.getBody()).reset();
+        }
+    }
+}

Propchange: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java Tue Feb  3 10:15:34 2009
@@ -46,6 +46,9 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
+                //TODO: remove this once the delegate processor supports async
+                errorHandler(noErrorHandler());
+                
                 from("dataset:foo").to("seda:queue:test?size=100");
                 from("seda:queue:test?size=100").to("dataset:foo");
             }

Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java Tue Feb  3 10:15:34 2009
@@ -19,6 +19,7 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.io.Reader;
 import java.io.StringReader;
 
 import javax.xml.transform.stream.StreamSource;
@@ -68,23 +69,22 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                streamCaching();
                 errorHandler(deadLetterChannel("direct:errorHandler").maximumRedeliveries(3));
                 from("direct:start").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         count++;
                         // Read the in stream from cache
                         String result = exchange.getIn().getBody(String.class);
-                        assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>");
+                        assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result);
                         throw new Exception("Forced exception by unit test");
                     }
                 });
 
                 //Need to set the streamCaching for the deadLetterChannel
-                from("direct:errorHandler").streamCaching().process(new Processor() {
+                from("direct:errorHandler").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         String result = exchange.getIn().getBody(String.class);
-                        assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>");
+                        assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result);
                     }
                 }).to("mock:error");
 

Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Tue Feb  3 10:15:34 2009
@@ -104,8 +104,12 @@
             outputProcessor = interceptor.getProcessor();
         }
 
+        // we are not interested in any other delegate processors in the route (e.g. stream caching)
+        while (outputProcessor instanceof DelegateProcessor) {
+            outputProcessor = ((DelegateProcessor) outputProcessor).getProcessor();
+        }
+
         assertIsInstanceOf(StreamResequencer.class, outputProcessor);
     }
-
 }
 

Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java Tue Feb  3 10:15:34 2009
@@ -37,7 +37,7 @@
     public void testSendStreamSource() throws Exception {
         x.expectedMessageCount(1);
         y.expectedMessageCount(1);
-     
+        
         sendBody("direct:start", new StreamSource(new StringReader("<message>xx</message>")));
         sendBody("direct:start", new StreamSource(new StringReader("<message>yy</message>")));
         
@@ -65,7 +65,7 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:start").convertBodyTo(String.class).choice()
+                from("direct:start").choice()
                   .when().xpath("/message/text() = 'xx'").to("mock:x")
                   .when().xpath("/message/text() = 'yy'").to("mock:y");
             }

Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Tue Feb  3 10:15:34 2009
@@ -20,12 +20,16 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.spi.InterceptStrategy;
 
 /**
  * @version $Revision$
@@ -141,7 +145,8 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                inheritErrorHandler(false);
+                //TODO: revert this once we get DelegateProcessor to support async
+                setErrorHandlerBuilder(noErrorHandler());
 
                 // START SNIPPET: example
                 from("direct:a").thread(1).process(new Processor() {