You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by "lakshmi.prashant" <la...@gmail.com> on 2015/01/14 12:43:06 UTC

Data Corruption in SFTP in Parallel Multicast branches

Hi,

 •	In case if there are SFTP receivers at the end of the branch(es) of
Parallel Multicast, the payload gets corrupted in 1 or more branches.
     o	  Any 1 branch / Some SFTP receivers may not receive the full data in
the respective SFTP file(s).

 
While validating it with a simple example (SFTP->parallel multicast->2*SFTP
in 2 different branches): A test with a large payload showed that both
receiver files were created but as soon as one is finished the other one
stops too. This already indicates that there are no two different streams
that are processed but just one stream that is handed to the SFTP endpoints. 


Even the trace log showed that if there is cache output stream, the same was
used by the SFTP receivers / endpoints in both the branches:

*Debug Trace:*
2015 01 13
08:45:21#+00#DEBUG#org.apache.camel.component.file.remote.SftpOperations##anonymous#Camel
(Test_Multicast) thread #7 - Multicast##avatarmercury#itjkmerct1#ifl##About
to store file: rec2 using stream:
*org.apache.camel.converter.stream.FileInputStreamCache@11ef331a|*
2015 01 13
08:45:21#+00#DEBUG#org.apache.camel.component.file.remote.SftpOperations##anonymous#Camel
(Test_Multicast) thread #6 - Multicast##avatarmercury#itjkmerct1#ifl##*About
to store file: rec1 using stream:
org.apache.camel.converter.stream.FileInputStreamCache@11ef331a|*


Both use the same stream instance. 
MulticastProcessor.createProcessorExchangePairs seems to create the copies.
The copying is based on Exchange.copy and Message.copy. Message.copy does
not copy the streams which is required for the mulicast to work properly
with streams. 

We could circumvent the issue, if we convert the payload / body  from cache
outputstream to byte[] before the multicast  or just before the SFTP
endpoints in the multicast branches (i.e. using  ${in.bodyAs(byte[])} 
within camel:simple ). 

This problem does not come in Sequential multicast.

Can you please let us knoiw if this is an issue 9or) already fixed?

Thanks,
Lakshmi







--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673.html
Sent from the Camel - Users mailing list archive at Nabble.com.

RE: Data Corruption in SFTP in Parallel Multicast branches

Posted by "Siano, Stephan" <st...@sap.com>.
Hi Lakshmi,

Sure, the exchange should be copied (and it actually is copied). The question is whether also to do a deep copy of the StreamCache instance (including the underlying file) or modifying the StreamCache implementations that they do not implement InputStream anymore but return a new InputStream instance for each consumer (which then work on the same underlying data). Maybe there is also some third way possible (like having some partial clone of the Stream cache that does not copy the underlying data, but in that case it might be rather tricky to determine when an underlying file might be deleted.

All this is somewhat intrusive into the core architecture of Camel, so I wonder what the Camel architects think about this.

Best regards
Stephan

-----Original Message-----
From: lakshmi.prashant [mailto:lakshmi.prashant@gmail.com] 
Sent: Freitag, 16. Januar 2015 08:47
To: users@camel.apache.org
Subject: RE: Data Corruption in SFTP in Parallel Multicast branches

Hi Stephan,

   The body of the main exchange should be copied to the branch exchanges,
as intended (Option 2 suggested by you).
   But I am not sure if it will lead to performance / memory issues, if
there are more branches with huge data in the body of the main route.

 Thanks,
 Lakshmi




--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5761781.html
Sent from the Camel - Users mailing list archive at Nabble.com.

RE: Data Corruption in SFTP in Parallel Multicast branches

Posted by "Siano, Stephan" <st...@sap.com>.
Hi,

You can look that up yourself in
https://issues.apache.org/jira/browse/CAMEL-8284 

There you will see that it is fixed in Camel 2.15.0 (not in 2.14)

If you need that in 2.14, you might be able to build your own (patched) version of 2.14.x and cherry pick the changes to it (there are four related changes).

Best regards
Stephan

-----Original Message-----
From: deepaktaker [mailto:deepaktaker@gmail.com] 
Sent: Donnerstag, 5. November 2015 04:12
To: users@camel.apache.org
Subject: Re: Data Corruption in SFTP in Parallel Multicast branches

Hi,
    I would like to know if the jira CAMEL-8284 created for the bug "Data
Corruption in SFTP in Parallel Multicast branches" released as patch. i was
wondering if the patch is made available in camel version 2.14

Regards,

Deepak



--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5773382.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Data Corruption in SFTP in Parallel Multicast branches

Posted by deepaktaker <de...@gmail.com>.
Hi,
    I would like to know if the jira CAMEL-8284 created for the bug "Data
Corruption in SFTP in Parallel Multicast branches" released as patch. i was
wondering if the patch is made available in camel version 2.14

Regards,

Deepak



--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5773382.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Data Corruption in SFTP in Parallel Multicast branches

Posted by Franz Paul Forsthofer <em...@googlemail.com>.
Hello Lakshmi and Stephan,

I created the bug https://issues.apache.org/jira/browse/CAMEL-8284

It does contain a solution proposal. Please have a look.

Regards Franz

On Fri, Jan 16, 2015 at 8:46 AM, lakshmi.prashant
<la...@gmail.com> wrote:
> Hi Stephan,
>
>    The body of the main exchange should be copied to the branch exchanges,
> as intended (Option 2 suggested by you).
>    But I am not sure if it will lead to performance / memory issues, if
> there are more branches with huge data in the body of the main route.
>
>  Thanks,
>  Lakshmi
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5761781.html
> Sent from the Camel - Users mailing list archive at Nabble.com.

RE: Data Corruption in SFTP in Parallel Multicast branches

Posted by "lakshmi.prashant" <la...@gmail.com>.
Hi Stephan,

   The body of the main exchange should be copied to the branch exchanges,
as intended (Option 2 suggested by you).
   But I am not sure if it will lead to performance / memory issues, if
there are more branches with huge data in the body of the main route.

 Thanks,
 Lakshmi




--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5761781.html
Sent from the Camel - Users mailing list archive at Nabble.com.

RE: Data Corruption in SFTP in Parallel Multicast branches

Posted by "Siano, Stephan" <st...@sap.com>.
Hi,

I have had a look into the source code:

If you look into the StreamCacheConverter an InputSteam will create a StreamCache object by copying the data to a CachedOutputStream and then calling the newStreamCache() method. That will generate a FileInputStreamCache or a ByteArrayStreamCache. Both classes are StreamCache and InputStream at the same time, so if the StreamCache is converted to an InputStream the instance itself is returned.

In the parallel multicast case (and probably also in the splitter), having a StreamCache before the split will result in all branches getting the same InputStream instance (which will lead to a pretty garbled output if two threads are reading from it at the same time).

I see two potential ways how to resolve this issue:
One way would be to change the StreamCache instances to not implement InputStream, but to provide a type converter to InputStream which will generate a new InputSteam for each consumer.

The other way would be copy the StreamCache instance in case of a multicast.

What do you think would be the better solution?

Best regards
Stephan

-----Original Message-----
From: lakshmi.prashant [mailto:lakshmi.prashant@gmail.com] 
Sent: Mittwoch, 14. Januar 2015 12:43
To: users@camel.apache.org
Subject: Data Corruption in SFTP in Parallel Multicast branches

Hi,

 •	In case if there are SFTP receivers at the end of the branch(es) of
Parallel Multicast, the payload gets corrupted in 1 or more branches.
     o	  Any 1 branch / Some SFTP receivers may not receive the full data in
the respective SFTP file(s).

 
While validating it with a simple example (SFTP->parallel multicast->2*SFTP
in 2 different branches): A test with a large payload showed that both
receiver files were created but as soon as one is finished the other one
stops too. This already indicates that there are no two different streams
that are processed but just one stream that is handed to the SFTP endpoints. 


Even the trace log showed that if there is cache output stream, the same was
used by the SFTP receivers / endpoints in both the branches:

*Debug Trace:*
2015 01 13
08:45:21#+00#DEBUG#org.apache.camel.component.file.remote.SftpOperations##anonymous#Camel
(Test_Multicast) thread #7 - Multicast##avatarmercury#itjkmerct1#ifl##About
to store file: rec2 using stream:
*org.apache.camel.converter.stream.FileInputStreamCache@11ef331a|*
2015 01 13
08:45:21#+00#DEBUG#org.apache.camel.component.file.remote.SftpOperations##anonymous#Camel
(Test_Multicast) thread #6 - Multicast##avatarmercury#itjkmerct1#ifl##*About
to store file: rec1 using stream:
org.apache.camel.converter.stream.FileInputStreamCache@11ef331a|*


Both use the same stream instance. 
MulticastProcessor.createProcessorExchangePairs seems to create the copies.
The copying is based on Exchange.copy and Message.copy. Message.copy does
not copy the streams which is required for the mulicast to work properly
with streams. 

We could circumvent the issue, if we convert the payload / body  from cache
outputstream to byte[] before the multicast  or just before the SFTP
endpoints in the multicast branches (i.e. using  ${in.bodyAs(byte[])} 
within camel:simple ). 

This problem does not come in Sequential multicast.

Can you please let us knoiw if this is an issue 9or) already fixed?

Thanks,
Lakshmi







--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Data Corruption in SFTP in Parallel Multicast branches

Posted by alexey-s <al...@mail.ru>.
Try replacing the Multicast on the Recipient List with
parallelProcessing=true.

Multicast: 
The Multicast allows to route the same message to a number of endpoints and
process them in a different way.

Recipient List:
The recipients will receive a copy of the same Exchange.



--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5766084.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Data Corruption in SFTP in Parallel Multicast branches

Posted by Franz Paul Forsthofer <em...@googlemail.com>.
Hello Lakshmi,

it could be that this issue is related to
https://issues.apache.org/jira/browse/CAMEL-8688

There we have made a patch in the current snapshot (version 2.16)

Can you try to reproduce your problem on the current snapshot?

Regards Franz

On Tue, May 12, 2015 at 11:39 AM, lakshmi.prashant
<la...@gmail.com> wrote:
> Hi,
>
>  If direct is changed to SEDA, the multiple branches still fail.
>
> /Error processing exchange. Exchange[Message: [Body is instance of
> org.apache.camel.StreamCache]]. Caused by: [org.quartz.JobExecutionException
> - org.apache.camel.CamelExchangeException: Parallel processing failed for
> number 0. Exchange[Message: [Body is instance of
> org.apache.camel.StreamCache]].
>
>  Caused by: [org.apache.camel.builder.xml.InvalidXPathExpression - Invalid
> xpath: /queryCompoundEmployeeResponse/CompoundEmployee.
> Reason: javax.xml.xpath.XPathExpressionException: Failure converting a node
> of class javax.xml.transform.sax.SAXSource: I/O error reported by XML parser
> processing null: Invalid byte 2 of 4-byte UTF-8 sequence.]]|/
>
>
>  If I convert the streamcache to String / byte[] after the stream cache
> producer & before the multicast, the issue does not arise.
>
>
> <camel:to  uri="myProducer"/>         *
> <camel:convertBodyTo type="java.lang.String" charset="UTF-8" />  *
> <camel:multicast id="ParallelGateway_1" parallelProcessing="true"
> stopOnException="true">
>         <camel:to uri="direct://SequenceFlow_5"/>
>         <camel:to uri="direct://SequenceFlow_7"/>
> </camel:multicast>
> </camel:route>
>
> Thanks,
> Lakshmi
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5767007.html
> Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Data Corruption in SFTP in Parallel Multicast branches

Posted by "lakshmi.prashant" <la...@gmail.com>.
Hi,

 If direct is changed to SEDA, the multiple branches still fail. 

/Error processing exchange. Exchange[Message: [Body is instance of
org.apache.camel.StreamCache]]. Caused by: [org.quartz.JobExecutionException
- org.apache.camel.CamelExchangeException: Parallel processing failed for
number 0. Exchange[Message: [Body is instance of
org.apache.camel.StreamCache]].

 Caused by: [org.apache.camel.builder.xml.InvalidXPathExpression - Invalid
xpath: /queryCompoundEmployeeResponse/CompoundEmployee. 
Reason: javax.xml.xpath.XPathExpressionException: Failure converting a node
of class javax.xml.transform.sax.SAXSource: I/O error reported by XML parser
processing null: Invalid byte 2 of 4-byte UTF-8 sequence.]]|/


 If I convert the streamcache to String / byte[] after the stream cache
producer & before the multicast, the issue does not arise.


<camel:to  uri="myProducer"/>         *
<camel:convertBodyTo type="java.lang.String" charset="UTF-8" />  *   
<camel:multicast id="ParallelGateway_1" parallelProcessing="true"
stopOnException="true">
        <camel:to uri="direct://SequenceFlow_5"/>
        <camel:to uri="direct://SequenceFlow_7"/>
</camel:multicast>
</camel:route>

Thanks,
Lakshmi



--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5767007.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Data Corruption in SFTP in Parallel Multicast branches

Posted by contactreji <co...@gmail.com>.
Hi

Have u tried replacing
<camel:multicast id="ParallelGateway_1" parallelProcessing="true"
stopOnException="true">
        <camel:to uri="direct://SequenceFlow_5"/>
        <camel:to uri="direct://SequenceFlow_7"/>
</camel:multicast>


With

<camel:multicast id="ParallelGateway_1" parallelProcessing="true"
stopOnException="true">
        <camel:inOnly uri="seda:SequenceFlow_5"/>
        <camel:inOnly uri="seda:SequenceFlow_7"/>
</camel:multicast>

Reji



-----
Reji Mathews
Sr. Developer - Middleware Integration / SOA ( Open Source - Apache Camel & Jboss Fuse ESB | Mule ESB )
LinkedIn - http://in.linkedin.com/pub/reji-mathews/31/9a2/40a
Twitter - reji_mathews
--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5766120.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Data Corruption in SFTP in Parallel Multicast branches

Posted by "lakshmi.prashant" <la...@gmail.com>.
Hi,

  We have another example of data corruption with Parallel Multicast. This
issue is there even in camel-core 2.14.2.

  If we change to serial multicast, the issue disappears. 

  I have a route with a camel producer (to uri) that writes data in the
exchange & this data is of type stream cache & it sets a huge data in the
exchange body.

  Immediately afterward, if I have a parallel multicast and in each
multicast branch, if I have some XPath based content filter, I get either of
the below exceptions:

*  Error               =
org.apache.camel.builder.xml.InvalidXPathExpression: Invalid xpath:
/queryCompoundEmployeeResponse/CompoundEmployee. Reason:
javax.xml.xpath.XPathExpressionException: Failure converting a node of class
javax.xml.transform.sax.SAXSource: I/O error reported by XML parser
processing null: Invalid byte 2 of 2-byte UTF-8 sequence., cause:
javax.xml.xpath.XPathExpressionException: Failure converting a node of class
javax.xml.transform.sax.SAXSource: I/O error reported by XML parser
processing null: Invalid byte 2 of 2-byte UTF-8 sequence.
*

(or) 

*
org.apache.camel.CamelExchangeException: Parallel processing failed for
number 0. Exchange[Message: [Body is not logged]]. Caused by:
[org.apache.camel.TypeConversionException - Error during type conversion
from type: java.lang.String to the required type: org.w3c.dom.Document with
value [Body is not logged] due org.xml.sax.SAXParseException; lineNumber: 1;
columnNumber: 1; Content is not allowed in prolog.]]|
*


I have even tried to set the Exchange.CHARSET_NAME property in the exchange,
but that does not help.

My beans.xml looks like the below:


<camel:route>
<camel:from
uri="quartz2://test.parallel.multicast.errorTimerEventDefinition11?fireNow=true&amp;trigger.repeatCount=0&amp;trigger.repeatInterval=0"/>

<camel:setProperty propertyName="Exchange.CHARSET_NAME">
	<camel:constant>UTF-8</camel:constant>
</camel:setProperty>

<camel:to  uri="myProducer"/> 
	
	
<camel:multicast id="ParallelGateway_1" parallelProcessing="true"
stopOnException="true">
	<camel:to uri="direct://SequenceFlow_5"/>
	<camel:to uri="direct://SequenceFlow_7"/>
</camel:multicast>
</camel:route>

<camel:route>
	<camel:from uri="direct://SequenceFlow_5"/>
	
	<camel:setBody id="CallActivity_4_1429516652727">
		<camel:xpath
resultType="org.w3c.dom.NodeList">//CompoundEmployee</camel:xpath>
	</camel:setBody>
	<camel:setBody id="CallActivity_5_1429516652731">
	
<camel:simple>&lt;CompoundResponse&gt;${in.body}&lt;/CompoundResponse&gt;</camel:simple>
	</camel:setBody>
	<camel:to id="MessageFlow_2_1429516652733"
uri="sap-util:preventPathTraversal"/>
	<camel:to ref="Receiver1_"/>
	
</camel:route>


<camel:route>
	<camel:from uri="direct://SequenceFlow_7"/>
	<camel:setBody id="CallActivity_7_1429516652738">
		<camel:xpath 
resultType="org.w3c.dom.NodeList">/queryCompoundEmployeeResponse/CompoundEmployee</camel:xpath>
	</camel:setBody>
	<camel:setBody id="CallActivity_1_1429516652742">
	
<camel:simple>&lt;CompoundResponse&gt;${in.body}&lt;/CompoundResponse&gt;</camel:simple>
	</camel:setBody>
	<camel:to id="MessageFlow_1_1429516652744"
uri="sap-util:preventPathTraversal"/>
	<camel:to ref="Receiver2_"/>	
	
</camel:route>


It looks like both the branches are still reading from the same stream and
that causes some race condition happens.
The error states that the XML parser for XPath extraction in 1 branch is
finding unwanted characters at the start of the data in the body of the
(branch) exchange, due to the above.

Debug Trace:

at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
	at
org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:736)
	at
org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:83)
	at
org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)
	at
org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:289)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:812)
Caused by: org.apache.camel.RuntimeCamelException:
com.sun.org.apache.xerces.internal.impl.io.MalformedByteSequenceException:
Invalid byte 1 of 1-byte UTF-8 sequence.
	at
org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1364)
	at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1006)
	at
org.apache.camel.impl.converter.InstanceMethodTypeConverter.convertTo(InstanceMethodTypeConverter.java:78)
	at
org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:276)
	at
org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:114)
	... 35 common frames omitted
Caused by:
com.sun.org.apache.xerces.internal.impl.io.MalformedByteSequenceException:
Invalid byte 1 of 1-byte UTF-8 sequence.
	at
com.sun.org.apache.xerces.internal.impl.io.UTF8Reader.invalidByte(UTF8Reader.java:687)
	at
com.sun.org.apache.xerces.internal.impl.io.UTF8Reader.read(UTF8Reader.java:557)
	at
com.sun.org.apache.xerces.internal.impl.XMLEntityScanner.load(XMLEntityScanner.java:1802)
	at
com.sun.org.apache.xerces.internal.impl.XMLEntityScanner.arrangeCapacity(XMLEntityScanner.java:1670)
	at
com.sun.org.apache.xerces.internal.impl.XMLEntityScanner.skipString(XMLEntityScanner.java:1708)
	at
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanEndElement(XMLDocumentFragmentScannerImpl.java:1748)
	at
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:2973)
	at
com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:606)
	at
com.sun.org.apache.xerces.internal.impl.XMLNSDocumentScannerImpl.next(XMLNSDocumentScannerImpl.java:117)
	at
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanDocument(XMLDocumentFragmentScannerImpl.java:510)
	at
com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:857)
	at
com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:777)
	at
com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:141)
	at
com.sun.org.apache.xerces.internal.parsers.DOMParser.parse(DOMParser.java:243)
	at
com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderImpl.parse(DocumentBuilderImpl.java:347)
	at javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:128)
	at
org.apache.camel.converter.jaxp.XmlConverter.toDOMDocument(XmlConverter.java:860)


Can you kindly help with any possible solution (or) is this a bug with camel
parallel multicasr?

Thanks,
Lakshmi




--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5766056.html
Sent from the Camel - Users mailing list archive at Nabble.com.