You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2009/02/03 11:15:36 UTC
svn commit: r740263 - in /camel/branches/camel-1.x: ./
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/java/org/apache/camel/processor/interceptor/
camel-core/src/main/java/org...
Author: gertv
Date: Tue Feb 3 10:15:34 2009
New Revision: 740263
URL: http://svn.apache.org/viewvc?rev=740263&view=rev
Log:
Merged revisions 740251 via svnmerge from
https://svn.eu.apache.org/repos/asf/camel/trunk
........
r740251 | gertv | 2009-02-03 10:16:53 +0100 (Tue, 03 Feb 2009) | 1 line
CAMEL-1271/CAMEL-520: Enable stream caching as an InterceptStrategy
........
Added:
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
- copied, changed from r740251, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java (with props)
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java
- copied unchanged from r740251, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
- copied unchanged from r740251, camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
Modified:
camel/branches/camel-1.x/ (props changed)
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 3 10:15:34 2009
@@ -1 +1 @@
-/camel/trunk:739733,739904
+/camel/trunk:739733,739904,740251
Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Tue Feb 3 10:15:34 2009
@@ -25,6 +25,7 @@
import org.apache.camel.processor.RecipientList;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.processor.interceptor.StreamCaching;
import org.apache.camel.spi.RouteContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -66,6 +67,7 @@
public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
Processor deadLetter = getDeadLetterFactory().createProcessor();
DeadLetterChannel answer = new DeadLetterChannel(processor, deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
+ StreamCaching.enable(routeContext);
configure(answer);
return answer;
}
Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Feb 3 10:15:34 2009
@@ -31,6 +31,7 @@
import org.apache.camel.model.LoggingLevel;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -174,6 +175,9 @@
exchange.setException(null);
}
+ // reset cached streams so they can be read again
+ MessageHelper.resetStreamCache(exchange.getIn());
+
// wait until we should redeliver
data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
Copied: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (from r740251, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java)
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java?p2=camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java&r1=740251&r2=740263&rev=740263&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java Tue Feb 3 10:15:34 2009
@@ -24,7 +24,7 @@
/**
* {@link InterceptStrategy} implementation to configure stream caching on a RouteContext
*/
-public class StreamCaching implements InterceptStrategy {
+public final class StreamCaching implements InterceptStrategy {
/*
* Hide constructor -- instances will be created through static enable() methods
Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Tue Feb 3 10:15:34 2009
@@ -25,6 +25,7 @@
import org.apache.camel.model.InterceptorRef;
import org.apache.camel.model.InterceptorType;
import org.apache.camel.processor.Interceptor;
+import org.apache.camel.util.MessageHelper;
/**
* {@link Interceptor} that converts a message into a re-readable format
@@ -38,9 +39,9 @@
try {
StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
if (newBody != null) {
- newBody.reset();
exchange.getIn().setBody(newBody);
}
+ MessageHelper.resetStreamCache(exchange.getIn());
} catch (NoTypeConversionAvailableException ex) {
// ignore if in is not of StreamCache type
}
Added: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java?rev=740263&view=auto
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java (added)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java Tue Feb 3 10:15:34 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.converter.stream.StreamCache;
+
+/**
+ * Some helper methods when working with {@link org.apache.camel.Message}.
+ *
+ * @version $Revision: 740251 $
+ */
+public final class MessageHelper {
+
+ /**
+ * Utility classes should not have a public constructor.
+ */
+ private MessageHelper() {
+ }
+
+ /**
+ * Extracts the given body and returns it as a String, that
+ * can be used for logging etc.
+ * <p/>
+ * Will handle stream based bodies wrapped in StreamCache.
+ *
+ * @param message the message with the body
+ * @return the body as String, can return <tt>null</null> if no body
+ */
+ public static String extractBodyAsString(Message message) {
+ if (message == null) {
+ return null;
+ }
+
+ StreamCache newBody = null;
+ try {
+ newBody = message.getBody(StreamCache.class);
+ if (newBody != null) {
+ message.setBody(newBody);
+ }
+ } catch (NoTypeConversionAvailableException ex) {
+ // ignore, in not of StreamCache type
+ }
+
+ Object answer;
+ try {
+ answer = message.getBody(String.class);
+ } catch (NoTypeConversionAvailableException ex) {
+ answer = message.getBody();
+ }
+
+ if (newBody != null) {
+ // Reset the InputStreamCache
+ newBody.reset();
+ }
+
+ return answer != null ? answer.toString() : null;
+ }
+
+ /**
+ * Gets the given body class type name as a String.
+ * <p/>
+ * Will skip java.lang. for the build in Java types.
+ *
+ * @param message the message with the body
+ * @return the body typename as String, can return <tt>null</null> if no body
+ */
+ public static String getBodyTypeName(Message message) {
+ if (message == null) {
+ return null;
+ }
+ String answer = ObjectHelper.classCanonicalName(message.getBody());
+ if (answer != null && answer.startsWith("java.lang.")) {
+ return answer.substring(10);
+ }
+ return answer;
+ }
+
+ /**
+ * If the message body contains a {@link StreamCache} instance, reset the cache to
+ * enable reading from it again.
+ *
+ * @param message the message for which to reset the body
+ */
+ public static void resetStreamCache(Message message) {
+ if (message == null) {
+ return;
+ }
+ if (message.getBody() instanceof StreamCache) {
+ ((StreamCache) message.getBody()).reset();
+ }
+ }
+}
Propchange: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java Tue Feb 3 10:15:34 2009
@@ -46,6 +46,9 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
+ //TODO: remove this once the delegate processor supports async
+ errorHandler(noErrorHandler());
+
from("dataset:foo").to("seda:queue:test?size=100");
from("seda:queue:test?size=100").to("dataset:foo");
}
Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java Tue Feb 3 10:15:34 2009
@@ -19,6 +19,7 @@
import java.io.ByteArrayInputStream;
import java.io.InputStream;
+import java.io.Reader;
import java.io.StringReader;
import javax.xml.transform.stream.StreamSource;
@@ -68,23 +69,22 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- streamCaching();
errorHandler(deadLetterChannel("direct:errorHandler").maximumRedeliveries(3));
from("direct:start").process(new Processor() {
public void process(Exchange exchange) throws Exception {
count++;
// Read the in stream from cache
String result = exchange.getIn().getBody(String.class);
- assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>");
+ assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result);
throw new Exception("Forced exception by unit test");
}
});
//Need to set the streamCaching for the deadLetterChannel
- from("direct:errorHandler").streamCaching().process(new Processor() {
+ from("direct:errorHandler").process(new Processor() {
public void process(Exchange exchange) throws Exception {
String result = exchange.getIn().getBody(String.class);
- assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>");
+ assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result);
}
}).to("mock:error");
Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Tue Feb 3 10:15:34 2009
@@ -104,8 +104,12 @@
outputProcessor = interceptor.getProcessor();
}
+ // we are not interested in any other delegate processors in the route (e.g. stream caching)
+ while (outputProcessor instanceof DelegateProcessor) {
+ outputProcessor = ((DelegateProcessor) outputProcessor).getProcessor();
+ }
+
assertIsInstanceOf(StreamResequencer.class, outputProcessor);
}
-
}
Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java Tue Feb 3 10:15:34 2009
@@ -37,7 +37,7 @@
public void testSendStreamSource() throws Exception {
x.expectedMessageCount(1);
y.expectedMessageCount(1);
-
+
sendBody("direct:start", new StreamSource(new StringReader("<message>xx</message>")));
sendBody("direct:start", new StreamSource(new StringReader("<message>yy</message>")));
@@ -65,7 +65,7 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("direct:start").convertBodyTo(String.class).choice()
+ from("direct:start").choice()
.when().xpath("/message/text() = 'xx'").to("mock:x")
.when().xpath("/message/text() = 'yy'").to("mock:y");
}
Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=740263&r1=740262&r2=740263&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Tue Feb 3 10:15:34 2009
@@ -20,12 +20,16 @@
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.spi.InterceptStrategy;
/**
* @version $Revision$
@@ -141,7 +145,8 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- inheritErrorHandler(false);
+ //TODO: revert this once we get DelegateProcessor to support async
+ setErrorHandlerBuilder(noErrorHandler());
// START SNIPPET: example
from("direct:a").thread(1).process(new Processor() {