You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by "lakshmi.prashant" <la...@gmail.com> on 2014/09/03 10:41:50 UTC

Stream Cache file deletion before aggregation in Multicast, involving huge data

Mybeans.xml <http://camel.465427.n5.nabble.com/file/n5755810/Mybeans.xml>  

Hi,

 We are using camel 2.13.2  - I have a multicast route with an
AggregationStrategy.  
 And in each multicast branch, we have a custom camel component that returns
huge data (around 4 MB) and writes to StreamCache (CachedOutputStream) and
we need to aggregate the data in the multicast (AggregationStrrategy).


  In the Aggregation strategy, I need to do XPath evaluation using camel
XPathBuilder. 
  Hence, I try to read the body and convert from StreamCache to byte[] to
avoid 'Error during type conversion from type:
org.apache.camel.converter.stream.InputStreamCache.' in the XPathBuilder.

When I try to read the body in the beginning of the AggregationStrategy, I
get the following error.

*/tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp
(No such file or directory), cause:
FileNotFoundException:/tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp
(No such file or directory).
	at java.io.FileInputStream.open(Native Method)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at
org.apache.camel.converter.stream.FileInputStreamCache.createInputStream(FileInputStreamCache.java:123)*
at
org.apache.camel.converter.stream.FileInputStreamCache.getInputStream(FileInputStreamCache.java:117)
	at
org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:93)
	at
org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102)
	at
com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToByteArray(MergeAtXPathAggregationStrategy.java:169)
	at
com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToXpathCompatibleType(MergeAtXPathAggregationStrategy.java:161)
         
Following is the line of code where it is throwing an error:
                   
                            Object body = exchange.getIn().getBody();
		if( body instanceof StreamCache){
			StreamCache cache = (StreamCache)body;
			xml = new String(*convertToByteArray(cache,exchange));*			             
exchange.getIn().setBody(xml);
		}



By disabling stream cache to write to file by setting a threshold of 10MB in
multicast related routes,  we were able to work with the aggregation
strategy. But we do not want to do that, as we may have incoming data that
maybe bigger.

<camel:camelContext id="multicast_xml_1" streamCache="true">
<camel:properties>
<camel:property key="CamelCachedOutputStreamCipherTransformation"
value="RC4"/>
<camel:property key="CamelCachedOutputStreamThreshold" value="100000000"/>
</camel:properties>

Note: The FileNotFound issue does not appear if we have the *StreamCache
based camel component* in the route with other processors, *but without
Multicast + Aggregation *.

Can you please let us know why the streamcache related temporary file gets
deleted before the aggregation of the branch exchange?

After debugging, I could understand the issue with aggregating huge data
from StreamCache with MulticastProcessor.

In MulticastProcessor.java: doProcessParallel() is called and on completion
of the branch exchange of multicast, the CachedOutputStream deletes / cleans
up the temporary file.

 This happens even before the multicast branch exchange reaches the
aggregation Strategy, which tries to read the data from the branch exchange.
In case of huge data in StreamCache, the temporary file is already deleted,
leading to FileNotFound issues.

   
    public CachedOutputStream(Exchange exchange, boolean closedOnCompletion)
{
        this.strategy = exchange.getContext().getStreamCachingStrategy();
        currentStream = new
CachedByteArrayOutputStream(strategy.getBufferSize());
        
        if (closedOnCompletion) {
            // add on completion so we can cleanup after the exchange is
done such as deleting temporary files
            exchange.addOnCompletion(new SynchronizationAdapter() {
                @Override
                public void onDone(Exchange exchange) {
                    try {
                        if (fileInputStreamCache != null) {
                            fileInputStreamCache.close();
                        }
                      *  close();*                    } catch (Exception e)
{
                        LOG.warn("Error deleting temporary cache file: " +
tempFile, e);
                    }
                }
    
                @Override
                public String toString() {
                    return "OnCompletion[CachedOutputStream]";
                }
            });
        }
    }

 *  public void close() throws IOException {
        currentStream.close();
        cleanUpTempFile();
    }*

<http://camel.465427.n5.nabble.com/file/n5756005/StreamCache_File_Gets_Deleted_before_Aggregation.png> 


I was able to circumvent the issue, if I try to set *closedOnCompletion=
false, while writing to CachedOutputStream in any component in any Multicast
branch.*
But this is a leaky solution, because the streamcache temporary file(s) may
then never get cleaned up.

Can the MulticastProcessor be adjusted so that the multicast branch
exchanges reach 'completion' status only, after they have been aggregated at
the end of multicast?

Please help / advise on the issue and help us resolve the issue

Thanks,
Lakshmi



--
View this message in context: http://camel.465427.n5.nabble.com/Stream-Cache-file-deletion-before-aggregation-in-Multicast-involving-huge-data-tp5756005.html
Sent from the Camel Development mailing list archive at Nabble.com.