You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2009/02/03 10:16:55 UTC
svn commit: r740251 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/builder/ main/java/org/apache/camel/processor/
main/java/org/apache/camel/processor/interceptor/
main/java/org/apache/camel/util/ test/java/org/apache/camel/component/data...
Author: gertv
Date: Tue Feb 3 09:16:53 2009
New Revision: 740251
URL: http://svn.apache.org/viewvc?rev=740251&view=rev
Log:
CAMEL-1271/CAMEL-520: Enable stream caching as an InterceptStrategy
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Tue Feb 3 09:16:53 2009
@@ -25,6 +25,7 @@
import org.apache.camel.processor.RecipientList;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
+import org.apache.camel.processor.interceptor.StreamCaching;
import org.apache.camel.spi.RouteContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -66,6 +67,7 @@
public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
Processor deadLetter = getDeadLetterFactory().createProcessor();
DeadLetterChannel answer = new DeadLetterChannel(processor, deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
+ StreamCaching.enable(routeContext);
configure(answer);
return answer;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Feb 3 09:16:53 2009
@@ -31,6 +31,7 @@
import org.apache.camel.model.LoggingLevel;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -175,6 +176,9 @@
exchange.setException(null);
}
+ // reset cached streams so they can be read again
+ MessageHelper.resetStreamCache(exchange.getIn());
+
// wait until we should redeliver
data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java?rev=740251&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java Tue Feb 3 09:16:53 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.interceptor;
+
+import org.apache.camel.Processor;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * {@link InterceptStrategy} implementation to configure stream caching on a RouteContext
+ */
+public class StreamCaching implements InterceptStrategy {
+
+ /*
+ * Hide constructor -- instances will be created through static enable() methods
+ */
+ private StreamCaching() {
+ super();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings("unchecked")
+ public Processor wrapProcessorInInterceptors(ProcessorType processorType, Processor target) throws Exception {
+ return new StreamCachingInterceptor(target);
+ }
+
+ /**
+ * Enable stream caching for a RouteContext
+ *
+ * @param context the route context
+ */
+ public static void enable(RouteContext context) {
+ for (InterceptStrategy strategy : context.getInterceptStrategies()) {
+ if (strategy instanceof StreamCaching) {
+ return;
+ }
+ }
+ context.addInterceptStrategy(new StreamCaching());
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Tue Feb 3 09:16:53 2009
@@ -25,6 +25,7 @@
import org.apache.camel.model.InterceptorRef;
import org.apache.camel.model.InterceptorType;
import org.apache.camel.processor.Interceptor;
+import org.apache.camel.util.MessageHelper;
/**
* {@link Interceptor} that converts a message into a re-readable format
@@ -38,9 +39,9 @@
try {
StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
if (newBody != null) {
- newBody.reset();
exchange.getIn().setBody(newBody);
}
+ MessageHelper.resetStreamCache(exchange.getIn());
} catch (NoTypeConversionAvailableException ex) {
// ignore if in is not of StreamCache type
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java Tue Feb 3 09:16:53 2009
@@ -90,4 +90,19 @@
}
return answer;
}
+
+ /**
+ * If the message body contains a {@link StreamCache} instance, reset the cache to
+ * enable reading from it again.
+ *
+ * @param message the message for which to reset the body
+ */
+ public static void resetStreamCache(Message message) {
+ if (message == null) {
+ return;
+ }
+ if (message.getBody() instanceof StreamCache) {
+ ((StreamCache) message.getBody()).reset();
+ }
+ }
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java Tue Feb 3 09:16:53 2009
@@ -46,6 +46,9 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
+ //TODO: remove this once the delegate processor supports async
+ errorHandler(noErrorHandler());
+
from("dataset:foo").to("seda:queue:test?size=100");
from("seda:queue:test?size=100").to("dataset:foo");
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java Tue Feb 3 09:16:53 2009
@@ -19,6 +19,7 @@
import java.io.ByteArrayInputStream;
import java.io.InputStream;
+import java.io.Reader;
import java.io.StringReader;
import javax.xml.transform.stream.StreamSource;
@@ -68,23 +69,22 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- streamCaching();
errorHandler(deadLetterChannel("direct:errorHandler").maximumRedeliveries(3));
from("direct:start").process(new Processor() {
public void process(Exchange exchange) throws Exception {
count++;
// Read the in stream from cache
String result = exchange.getIn().getBody(String.class);
- assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>");
+ assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result);
throw new Exception("Forced exception by unit test");
}
});
//Need to set the streamCaching for the deadLetterChannel
- from("direct:errorHandler").streamCaching().process(new Processor() {
+ from("direct:errorHandler").process(new Processor() {
public void process(Exchange exchange) throws Exception {
String result = exchange.getIn().getBody(String.class);
- assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>");
+ assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result);
}
}).to("mock:error");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Tue Feb 3 09:16:53 2009
@@ -104,8 +104,12 @@
outputProcessor = interceptor.getProcessor();
}
+ // we are not interested in any other delegate processors in the route (e.g. stream caching)
+ while (outputProcessor instanceof DelegateProcessor) {
+ outputProcessor = ((DelegateProcessor) outputProcessor).getProcessor();
+ }
+
assertIsInstanceOf(StreamResequencer.class, outputProcessor);
}
-
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java Tue Feb 3 09:16:53 2009
@@ -37,7 +37,7 @@
public void testSendStreamSource() throws Exception {
x.expectedMessageCount(1);
y.expectedMessageCount(1);
-
+
sendBody("direct:start", new StreamSource(new StringReader("<message>xx</message>")));
sendBody("direct:start", new StreamSource(new StringReader("<message>yy</message>")));
@@ -65,7 +65,7 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("direct:start").convertBodyTo(String.class).choice()
+ from("direct:start").choice()
.when().xpath("/message/text() = 'xx'").to("mock:x")
.when().xpath("/message/text() = 'yy'").to("mock:y");
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=740251&r1=740250&r2=740251&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Tue Feb 3 09:16:53 2009
@@ -20,12 +20,16 @@
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.spi.InterceptStrategy;
/**
* @version $Revision$
@@ -141,7 +145,8 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- inheritErrorHandler(false);
+ //TODO: revert this once we get DelegateProcessor to support async
+ setErrorHandlerBuilder(noErrorHandler());
// START SNIPPET: example
from("direct:a").thread(1).process(new Processor() {
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java?rev=740251&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java Tue Feb 3 09:16:53 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.interceptor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.impl.DefaultRouteContext;
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * Test cases for {@link StreamCaching}
+ */
+public class StreamCachingTest extends ContextTestSupport {
+
+ /**
+ * Tests enabling stream caching on a {@link RouteContext}
+ */
+ public void testEnableOnRouteContext() throws Exception {
+ RouteContext rc = new DefaultRouteContext(super.context);
+ StreamCaching.enable(rc);
+ assertStrategyEnabled("Enabling StreamCaching should add it to the intercept strategies", rc);
+ StreamCaching.enable(rc);
+ assertStrategyEnabled("Enabling it again should not add a second instance", rc);
+ }
+
+ /*
+ * Assert that the strategy is enabled exactly one time
+ */
+ private void assertStrategyEnabled(String message, RouteContext rc) {
+ int count = 0;
+ for (InterceptStrategy strategy : rc.getInterceptStrategies()) {
+ if (strategy instanceof StreamCaching) {
+ count++;
+ }
+ }
+ assertEquals(message, 1, count);
+ }
+
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java?rev=740251&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java Tue Feb 3 09:16:53 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import junit.framework.TestCase;
+
+import org.apache.camel.Message;
+import org.apache.camel.converter.stream.StreamCache;
+import org.apache.camel.impl.DefaultMessage;
+
+/**
+ * Test cases for {@link MessageHelper}
+ */
+public class MessageHelperTest extends TestCase {
+
+ private Message message;
+
+ @Override
+ protected void setUp() throws Exception {
+ message = new DefaultMessage();
+ }
+
+ /*
+ * Tests the {@link MessageHelper#resetStreamCache(Message)} method
+ */
+ public void testResetStreamCache() throws Exception {
+ // should not throw exceptions when Message or message body is null
+ MessageHelper.resetStreamCache((Message) null);
+ MessageHelper.resetStreamCache(message);
+
+ // handle StreamCache
+ final ValueHolder<Boolean> reset = new ValueHolder<Boolean>(Boolean.FALSE);
+ message.setBody(new StreamCache() {
+ public void reset() {
+ reset.set(Boolean.TRUE);
+ }
+ });
+ MessageHelper.resetStreamCache(message);
+ assertTrue("Should have reset the stream cache", reset.get());
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
------------------------------------------------------------------------------
svn:eol-style = native