You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by si...@apache.org on 2019/02/07 11:05:34 UTC
[camel] branch camel-2.23.x updated: CAMEL-13168 - Delay
StreamCache file deletion till calling LUW is done
This is an automated email from the ASF dual-hosted git repository.
siano pushed a commit to branch camel-2.23.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.23.x by this push:
new e2137a7 CAMEL-13168 - Delay StreamCache file deletion till calling LUW is done
e2137a7 is described below
commit e2137a732fb1c37c313f6ffe4f5ba1baaafc0ac2
Author: Stephan Siano <st...@sap.com>
AuthorDate: Thu Feb 7 12:00:52 2019 +0100
CAMEL-13168 - Delay StreamCache file deletion till calling LUW is done
---
.../camel/component/directvm/DirectVmProcessor.java | 8 ++++++++
.../apache/camel/processor/StreamCachingInOutTest.java | 17 ++++++++++++++++-
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
index 2844f6a..167c539 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
@@ -91,6 +91,14 @@ public final class DirectVmProcessor extends DelegateAsyncProcessor {
Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext(), false);
// set the from endpoint
newExchange.setFromEndpoint(endpoint);
+ // The StreamCache created by the child routes must not be
+ // closed by the unit of work of the child route, but by the unit of
+ // work of the parent route or grand parent route or grand grand parent route ...(in case of nesting).
+ // Set therefore the unit of work of the parent route as stream cache unit of work,
+ // if it is not already set.
+ if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
+ newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork());
+ }
return newExchange;
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
index 36adc4d..78caf8a 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
@@ -40,13 +40,28 @@ public class StreamCachingInOutTest extends ContextTestSupport {
assertEquals(c.assertExchangeReceived(0).getIn().getBody(String.class), "James,Guillaume,Hiram,Rob,Roman");
}
+ @Test
+ public void testStreamCachingPerRouteWithDirecVM() throws Exception {
+ MockEndpoint e = getMockEndpoint("mock:e");
+ e.expectedMessageCount(1);
+
+ InputStream message = getTestFileStream();
+ template.sendBody("direct:e", message);
+
+ assertMockEndpointsSatisfied();
+ assertEquals(e.assertExchangeReceived(0).getIn().getBody(String.class), "James,Guillaume,Hiram,Rob,Roman");
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:c").noStreamCaching().to("direct:d").to("mock:c");
+ context.getStreamCachingStrategy().setSpoolThreshold(1);
+ from("direct:c").noStreamCaching().to("direct:d").convertBodyTo(String.class).to("mock:c");
from("direct:d").streamCaching().process(new TestProcessor());
+ from("direct:e").noStreamCaching().to("direct-vm:f").convertBodyTo(String.class).to("mock:e");
+ from("direct-vm:f").streamCaching().process(new TestProcessor());
}
};
}