You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by "lakshmi.prashant" <la...@gmail.com> on 2014/09/05 13:18:47 UTC

Streamcache issue - deletion of spooled files in multicast sub exchanges before aggregationstrategy

Hi, 
Mybeans.xml <http://camel.465427.n5.nabble.com/file/n5756091/Mybeans.xml>  

*Issue:*   

  Whenever data is spooled in file via CachedOutputStream in any camel
component in a multicast branch, that data becomes unreadable in 

a) Aggregation Strategy of Multicast 
b) After multicast, in case there is no aggregation strategy 

We are getting: 

a) FileNotFound issues as the file is deleted on completion of the cloned
branch exchange. 
b) Premature end of file, when we read data from InputStream and use
XMLReader / STAX to read the data. 

If we use the Constructor: new CachedOutputStream(exchange, false), the
streamcache file will not be deleted. 
But the file may never be cleaned up & we do not want to do that. 


*Details*

 We are using camel 2.13.2  - I have a multicast route with an
AggregationStrategy.   
 And in each multicast branch, we have a custom camel component that returns
huge data (around 4 MB) and writes to StreamCache (CachedOutputStream) and
we need to aggregate the data in the multicast (AggregationStrrategy). 


  In the Aggregation strategy, I need to do XPath evaluation using camel
XPathBuilder. 
  Hence, I try to read the body and convert from StreamCache to byte[] to
avoid 'Error during type conversion from type:
org.apache.camel.converter.stream.InputStreamCache.' in the XPathBuilder. 

When I try to read the body in the beginning of the AggregationStrategy, I
get the following error. 

/tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp
(No such file or directory), cause:
FileNotFoundException:/tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp
(No such file or directory). 
        at java.io.FileInputStream.open(Native Method) 
        at java.io.FileInputStream.<init>(FileInputStream.java:138) 
        at
org.apache.camel.converter.stream.FileInputStreamCache.createInputStream(FileInputStreamCache.java:123)       
at
org.apache.camel.converter.stream.FileInputStreamCache.getInputStream(FileInputStreamCache.java:117) 
        at
org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:93) 
        at
org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102) 
        at
com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToByteArray(MergeAtXPathAggregationStrategy.java:169) 
        at
com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToXpathCompatibleType(MergeAtXPathAggregationStrategy.java:161) 
          
Following is the line of code where it is throwing an error: 
                    
                            Object body = exchange.getIn().getBody(); 
                if( body instanceof StreamCache){ 
                        StreamCache cache = (StreamCache)body; 
                        xml = new
String(convertToByteArray(cache,exchange));                                     
exchange.getIn().setBody(xml); 
                } 


<http://camel.465427.n5.nabble.com/file/n5756091/StreamCache_File_Gets_Deleted_before_Aggregation.png> 

By disabling stream cache to write to file by setting a threshold of 10MB in
multicast related routes,  we were able to work with the aggregation
strategy. But we do not want to do that, as we may have incoming data that
maybe bigger. 

<camel:camelContext id="multicast_xml_1" streamCache="true">
<camel:properties>
<camel:property key="CamelCachedOutputStreamCipherTransformation"
value="RC4"/>
<camel:property key="CamelCachedOutputStreamThreshold" value="100000000"/>
</camel:properties>

Note: The FileNotFound issue does not appear if we have the StreamCache
based camel component in the route with other processors, but without
Multicast + Aggregation . 

After debugging, I could understand the issue with aggregating huge data
from StreamCache with MulticastProcessor. 

In MulticastProcessor.java: doProcessParallel() is called and on completion
of the branch exchange of multicast, the CachedOutputStream deletes / cleans
up the temporary file. 

 This happens even before the multicast branch exchange reaches the
aggregation Strategy, which tries to read the data from the branch exchange.
In case of huge data in StreamCache, the temporary file is already deleted,
leading to FileNotFound issues. 

    
    public CachedOutputStream(Exchange exchange, boolean closedOnCompletion)
{ 
        this.strategy = exchange.getContext().getStreamCachingStrategy(); 
        currentStream = new
CachedByteArrayOutputStream(strategy.getBufferSize()); 
        
        if (closedOnCompletion) { 
            // add on completion so we can cleanup after the exchange is
done such as deleting temporary files 
            exchange.addOnCompletion(new SynchronizationAdapter() { 
                @Override 
                public void onDone(Exchange exchange) { 
                    try { 
                        if (fileInputStreamCache != null) { 
                            fileInputStreamCache.close(); 
                        } 
                        close();                    } catch (Exception e) { 
                        LOG.warn("Error deleting temporary cache file: " +
tempFile, e); 
                    } 
                } 
    
                @Override 
                public String toString() { 
                    return "OnCompletion[CachedOutputStream]"; 
                } 
            }); 
        } 
    } 

   public void close() throws IOException { 
        currentStream.close(); 
        cleanUpTempFile(); 
    }




I was able to circumvent the issue, if I try to set closedOnCompletion=
false, while writing to CachedOutputStream in any component in any Multicast
branch.
But this is a leaky solution, because the streamcache temporary file(s) may
then never get cleaned up. 

Can the MulticastProcessor be adjusted so that the multicast branch
exchanges reach 'completion' status only, after they have been aggregated at
the end of multicast? 

Please help / advise on the issue, as I am new to using camel Multicast. 



--
View this message in context: http://camel.465427.n5.nabble.com/Streamcache-issue-deletion-of-spooled-files-in-multicast-sub-exchanges-before-aggregationstrategy-tp5756091.html
Sent from the Camel Development mailing list archive at Nabble.com.