You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by si...@apache.org on 2019/02/07 11:10:56 UTC

[camel] branch camel-2.22.x updated: CAMEL-13168 - Delay StreamCache file deletion till calling LUW is done

This is an automated email from the ASF dual-hosted git repository.

siano pushed a commit to branch camel-2.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.22.x by this push:
     new 13c4d98  CAMEL-13168 - Delay StreamCache file deletion till calling LUW is done
13c4d98 is described below

commit 13c4d9867f3827cff083f1aa291e0ae2c5f8736e
Author: Stephan Siano <st...@sap.com>
AuthorDate: Thu Feb 7 12:00:52 2019 +0100

    CAMEL-13168 - Delay StreamCache file deletion till calling LUW is done
---
 .../camel/component/directvm/DirectVmProcessor.java     |  8 ++++++++
 .../apache/camel/processor/StreamCachingInOutTest.java  | 17 ++++++++++++++++-
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
index 2844f6a..167c539 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
@@ -91,6 +91,14 @@ public final class DirectVmProcessor extends DelegateAsyncProcessor {
         Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext(), false);
         // set the from endpoint
         newExchange.setFromEndpoint(endpoint);
+        // The StreamCache created by the child routes must not be 
+        // closed by the unit of work of the child route, but by the unit of 
+        // work of the parent route or grand parent route or grand grand parent route ...(in case of nesting).
+        // Set therefore the unit of work of the  parent route as stream cache unit of work, 
+        // if it is not already set.
+        if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
+            newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork());
+        }
         return newExchange;
     }
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
index 2514961..1284e67 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
@@ -38,13 +38,28 @@ public class StreamCachingInOutTest extends ContextTestSupport {
         assertEquals(c.assertExchangeReceived(0).getIn().getBody(String.class), "James,Guillaume,Hiram,Rob,Roman");
     }
 
+    @Test
+    public void testStreamCachingPerRouteWithDirecVM() throws Exception {
+        MockEndpoint e = getMockEndpoint("mock:e");
+        e.expectedMessageCount(1);
+
+        InputStream message = getTestFileStream();
+        template.sendBody("direct:e", message);
+
+        assertMockEndpointsSatisfied();
+        assertEquals(e.assertExchangeReceived(0).getIn().getBody(String.class), "James,Guillaume,Hiram,Rob,Roman");
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:c").noStreamCaching().to("direct:d").to("mock:c");
+                context.getStreamCachingStrategy().setSpoolThreshold(1);
+                from("direct:c").noStreamCaching().to("direct:d").convertBodyTo(String.class).to("mock:c");
                 from("direct:d").streamCaching().process(new TestProcessor());
+                from("direct:e").noStreamCaching().to("direct-vm:f").convertBodyTo(String.class).to("mock:e");
+                from("direct-vm:f").streamCaching().process(new TestProcessor());
             }
         };
     }