You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Geert Vanheusden <ge...@aviovision.com> on 2015/04/17 18:05:33 UTC

Closed (removed) StreamCache when doing a Wiretap

Hi,

I noticed a bug where the body (StreamCache) was already removed before the
exchange reached the end (in the Wiretap route).

I found the following ticket
https://issues.apache.org/jira/browse/CAMEL-8386 and code
https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
but I think it still doesn't fix the Wiretap problem.

Here you can find my test (executed on 2.15.1). If you disable the
StreamCaching or remove the delay it works, enabling it again will break
the test.

============
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultStreamCachingStrategy;
import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Before;
import org.junit.Test;

public class WireTapTest extends CamelTestSupport {

private MockEndpoint y;
private MockEndpoint z;

@Before
public void prepareEndpoints() {
y = getMockEndpoint("mock:file:y");
z = getMockEndpoint("mock:file:z");
}

@Test
public void
testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
throws InterruptedException {
y.expectedMessageCount(1);
z.expectedMessageCount(1);

// test.txt should contain more than one character
template.sendBody("direct:start",
this.getClass().getResourceAsStream("/test.txt"));

assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
StreamCachingStrategy streamCachingStrategy = new
DefaultStreamCachingStrategy();
streamCachingStrategy.setSpoolThreshold(1);
context.setStreamCachingStrategy(streamCachingStrategy);
context.setStreamCaching(true);

from("direct:start")
.wireTap("direct:x")
.to("file:y");

from("direct:x")
.delay(2000)
.to("file:z");
}
};
}

@Override
public String isMockEndpoints() {
return "(file:z|file:y)";
}
}
=============

If you run the test you can clearly see the temp file deletion followed by
the closed stream exception:

Tried 1 to delete file:
/var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
with result: true

Cannot reset stream from file
/var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp

I encountered the same issue during a more complex route that does some
splitting (zip file) and multicasting. This occurred on Camel 2.14.1 so it
could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284 but I
need to test this.

Kind regards,

Geert

Re: Closed (removed) StreamCache when doing a Wiretap

Posted by Geert Vanheusden <ge...@aviovision.com>.
Hi Franz,

I would certainly prefer this approach. In our case we are handling large
files so doing a copy means unnecessary delays and storage usage.

Regards,

Geert


On Wed, Apr 22, 2015 at 10:17 AM, Franz Paul Forsthofer <
emc2fpf@googlemail.com> wrote:

> Hello Claus and Henryk,
>
> my original proposal to copy the stream cache file is not the optimal
> solution. A better solution would be to have only one stream cache
> file and to delete this file only when all exchanges which need this
> file are done. This does mean we have to register listeners to the
> event onDone of all UnitOfWorks of the relevant exchanges in the
> stream cache file object and only when all listeners have got the
> onDone event, then the file can be deleted.  However this will require
> quite some changes
>
> Probably we could also use this solution for the agregator and splitter
> case..
>
> Regards Franz
>
> On Wed, Apr 22, 2015 at 9:47 AM, Claus Ibsen <cl...@gmail.com>
> wrote:
> > Hi
> >
> > Yeah we should likely have a StreamCacheHelper or introduce a
> > copy(boolean clone) method that takes a boolean (with a better name)
> > that indicate it does a indpendenent copy. Then we can keep the inner
> > details how this copy does in those stream cache implementations.
> >
> > The wire tap already does a copy of the stream cache today. But it
> > likely need that clone copy. We could also make that the default.
> > Though I think multicast eip does a copy as well but it may reuse the
> > same underlying file, and only delete it when last exchange is done
> > and closes it.
> >
> >
> >
> > On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <he...@gmail.com>
> wrote:
> >> Hi,
> >>
> >> You can also use Wiretap's onPrepareRef option and use custom processor
> to
> >> copy the content of the cached body.
> >>
> >> Franz, would you be so kind and create a pull request with your fix?
> >> Somebody will review it and merge. Thanks in advance!
> >>
> >> Cheers!
> >>
> >> wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer <
> >> emc2fpf@googlemail.com> napisał:
> >>
> >>> Hi Geert,
> >>>
> >>> it is a bug. You can try as a workaround to set the threshold
> >>> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge
> >>> number; then the body will be kept in memory.
> >>>
> >>> Alternatively, you can modify the code of the Camel class
> >>> org.apache.camel.processor.WireTapProcessor. You have to modifiy the
> >>> method configureCopyExchange in the following way:
> >>>
> >>>
> >>>    private Exchange configureCopyExchange(Exchange exchange) throws
> >>> IOException {
> >>>
> >>>
> >>>         // must use a copy as we dont want it to cause side effects of
> >>> the original exchange
> >>>         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
> >>> false);
> >>>
> >>>         if (copy.getIn().getBody() instanceof FileInputStreamCache) {
> >>>             //the file stream must be copied, otherwise you get errors
> >>> because the stream file is removed when the parent route is finished
> >>>             FileInputStreamCache streamCache = (FileInputStreamCache)
> >>> exchange.getIn().getBody();
> >>>             CachedOutputStream cos = new CachedOutputStream(copy);
> >>>             try {
> >>>               IOHelper.copy(streamCache, cos);
> >>>             } finally {
> >>>               IOHelper.close(streamCache, cos);
> >>>               streamCache.reset();
> >>>             }
> >>>             copy.getIn().setBody(cos.newStreamCache());
> >>>         }
> >>>         // set MEP to InOnly as this wire tap is a fire and forget
> >>>         copy.setPattern(ExchangePattern.InOnly);
> >>>         return copy;
> >>>     }
> >>>
> >>> The idea behind this is to make a copy of the stream cache file, so
> >>> that you get an additional stream cache file for the second route (in
> >>> your case for the route "direct:x"). This second stream cache file
> >>> will be deleted when the second route is finished.
> >>>
> >>> I also hope that this issue will be fixed. I am no committer so I
> >>> cannot say when this issue will be solved; I have made contributions
> >>> which solved a similar problem in the aggregator and splitter.
> >>>
> >>> I think you can open a Jira ticket with the above solution suggestion.
> >>>
> >>> Regards Franz
> >>>
> >>> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden
> >>> <ge...@aviovision.com> wrote:
> >>> > Hi Franz,
> >>> >
> >>> > is this something that will be fixed in an upcoming release? Is it a
> bug
> >>> or
> >>> > does it work as designed?
> >>> > Can we use a workaround to avoid this behaviour, for example by not
> >>> > deleting the temp files?
> >>> >
> >>> >
> >>> > Kind regards,
> >>> >
> >>> > Geert
> >>> >
> >>> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
> >>> > emc2fpf@googlemail.com> wrote:
> >>> >
> >>> >> Hello Geert,
> >>> >>
> >>> >> there is no solution yet for your problem. Currently the stream
> cache
> >>> >> file is removed at the end of the route which created the file. In
> >>> >> your case the stream cache file is deleted when the "direct:start"
> >>> >> route is finished. The wire tap runs in a separate thread and
> >>> >> therefore it can happen that it tries to read the cached file when
> it
> >>> >> is already deleted, especially when you have a delay in the wiretap
> >>> >> route ("direct:x").
> >>> >>
> >>> >>
> >>> >> Regards Franz
> >>> >>
> >>> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
> >>> >> <ge...@aviovision.com> wrote:
> >>> >> > Hi,
> >>> >> >
> >>> >> > I noticed a bug where the body (StreamCache) was already removed
> >>> before
> >>> >> the
> >>> >> > exchange reached the end (in the Wiretap route).
> >>> >> >
> >>> >> > I found the following ticket
> >>> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
> >>> >> >
> >>> >>
> >>>
> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
> >>> >> > but I think it still doesn't fix the Wiretap problem.
> >>> >> >
> >>> >> > Here you can find my test (executed on 2.15.1). If you disable the
> >>> >> > StreamCaching or remove the delay it works, enabling it again will
> >>> break
> >>> >> > the test.
> >>> >> >
> >>> >> > ============
> >>> >> > import org.apache.camel.builder.RouteBuilder;
> >>> >> > import org.apache.camel.component.mock.MockEndpoint;
> >>> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
> >>> >> > import org.apache.camel.spi.StreamCachingStrategy;
> >>> >> > import org.apache.camel.test.junit4.CamelTestSupport;
> >>> >> > import org.junit.Before;
> >>> >> > import org.junit.Test;
> >>> >> >
> >>> >> > public class WireTapTest extends CamelTestSupport {
> >>> >> >
> >>> >> > private MockEndpoint y;
> >>> >> > private MockEndpoint z;
> >>> >> >
> >>> >> > @Before
> >>> >> > public void prepareEndpoints() {
> >>> >> > y = getMockEndpoint("mock:file:y");
> >>> >> > z = getMockEndpoint("mock:file:z");
> >>> >> > }
> >>> >> >
> >>> >> > @Test
> >>> >> > public void
> >>> >> >
> >>> >>
> >>>
> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
> >>> >> > throws InterruptedException {
> >>> >> > y.expectedMessageCount(1);
> >>> >> > z.expectedMessageCount(1);
> >>> >> >
> >>> >> > // test.txt should contain more than one character
> >>> >> > template.sendBody("direct:start",
> >>> >> > this.getClass().getResourceAsStream("/test.txt"));
> >>> >> >
> >>> >> > assertMockEndpointsSatisfied();
> >>> >> > }
> >>> >> >
> >>> >> > @Override
> >>> >> > protected RouteBuilder createRouteBuilder() throws Exception {
> >>> >> > return new RouteBuilder() {
> >>> >> > @Override
> >>> >> > public void configure() throws Exception {
> >>> >> > StreamCachingStrategy streamCachingStrategy = new
> >>> >> > DefaultStreamCachingStrategy();
> >>> >> > streamCachingStrategy.setSpoolThreshold(1);
> >>> >> > context.setStreamCachingStrategy(streamCachingStrategy);
> >>> >> > context.setStreamCaching(true);
> >>> >> >
> >>> >> > from("direct:start")
> >>> >> > .wireTap("direct:x")
> >>> >> > .to("file:y");
> >>> >> >
> >>> >> > from("direct:x")
> >>> >> > .delay(2000)
> >>> >> > .to("file:z");
> >>> >> > }
> >>> >> > };
> >>> >> > }
> >>> >> >
> >>> >> > @Override
> >>> >> > public String isMockEndpoints() {
> >>> >> > return "(file:z|file:y)";
> >>> >> > }
> >>> >> > }
> >>> >> > =============
> >>> >> >
> >>> >> > If you run the test you can clearly see the temp file deletion
> >>> followed
> >>> >> by
> >>> >> > the closed stream exception:
> >>> >> >
> >>> >> > Tried 1 to delete file:
> >>> >> >
> >>> >>
> >>>
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> >>> >> > with result: true
> >>> >> >
> >>> >> > Cannot reset stream from file
> >>> >> >
> >>> >>
> >>>
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> >>> >> >
> >>> >> > I encountered the same issue during a more complex route that does
> >>> some
> >>> >> > splitting (zip file) and multicasting. This occurred on Camel
> 2.14.1
> >>> so
> >>> >> it
> >>> >> > could be fixed by
> https://issues.apache.org/jira/browse/CAMEL-8284
> >>> but I
> >>> >> > need to test this.
> >>> >> >
> >>> >> > Kind regards,
> >>> >> >
> >>> >> > Geert
> >>> >>
> >>>
> >
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > Red Hat, Inc.
> > Email: cibsen@redhat.com
> > Twitter: davsclaus
> > Blog: http://davsclaus.com
> > Author of Camel in Action: http://www.manning.com/ibsen
> > hawtio: http://hawt.io/
> > fabric8: http://fabric8.io/
>

Re: Closed (removed) StreamCache when doing a Wiretap

Posted by Geert Vanheusden <ge...@aviovision.com>.
Ticket created: https://issues.apache.org/jira/browse/CAMEL-8688

Regards,

Geert


On Thu, Apr 23, 2015 at 1:25 PM, Claus Ibsen <cl...@gmail.com> wrote:

> On Wed, Apr 22, 2015 at 10:17 AM, Franz Paul Forsthofer
> <em...@googlemail.com> wrote:
> > Hello Claus and Henryk,
> >
> > my original proposal to copy the stream cache file is not the optimal
> > solution. A better solution would be to have only one stream cache
> > file and to delete this file only when all exchanges which need this
> > file are done. This does mean we have to register listeners to the
> > event onDone of all UnitOfWorks of the relevant exchanges in the
> > stream cache file object and only when all listeners have got the
> > onDone event, then the file can be deleted.  However this will require
> > quite some changes
> >
> > Probably we could also use this solution for the agregator and splitter
> case..
> >
>
> Yeah though that entails the stream cache needs to keep this state and
> be tailored to not only 1 exchange but to potentially N+ exchanges.
>
> But a good idea nevertheless. Especially for big streams.
>
> Logging a JIRA ticket is welcome.
>
>
>
>
> > Regards Franz
> >
> > On Wed, Apr 22, 2015 at 9:47 AM, Claus Ibsen <cl...@gmail.com>
> wrote:
> >> Hi
> >>
> >> Yeah we should likely have a StreamCacheHelper or introduce a
> >> copy(boolean clone) method that takes a boolean (with a better name)
> >> that indicate it does a indpendenent copy. Then we can keep the inner
> >> details how this copy does in those stream cache implementations.
> >>
> >> The wire tap already does a copy of the stream cache today. But it
> >> likely need that clone copy. We could also make that the default.
> >> Though I think multicast eip does a copy as well but it may reuse the
> >> same underlying file, and only delete it when last exchange is done
> >> and closes it.
> >>
> >>
> >>
> >> On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <he...@gmail.com>
> wrote:
> >>> Hi,
> >>>
> >>> You can also use Wiretap's onPrepareRef option and use custom
> processor to
> >>> copy the content of the cached body.
> >>>
> >>> Franz, would you be so kind and create a pull request with your fix?
> >>> Somebody will review it and merge. Thanks in advance!
> >>>
> >>> Cheers!
> >>>
> >>> wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer <
> >>> emc2fpf@googlemail.com> napisał:
> >>>
> >>>> Hi Geert,
> >>>>
> >>>> it is a bug. You can try as a workaround to set the threshold
> >>>> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge
> >>>> number; then the body will be kept in memory.
> >>>>
> >>>> Alternatively, you can modify the code of the Camel class
> >>>> org.apache.camel.processor.WireTapProcessor. You have to modifiy the
> >>>> method configureCopyExchange in the following way:
> >>>>
> >>>>
> >>>>    private Exchange configureCopyExchange(Exchange exchange) throws
> >>>> IOException {
> >>>>
> >>>>
> >>>>         // must use a copy as we dont want it to cause side effects of
> >>>> the original exchange
> >>>>         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
> >>>> false);
> >>>>
> >>>>         if (copy.getIn().getBody() instanceof FileInputStreamCache) {
> >>>>             //the file stream must be copied, otherwise you get errors
> >>>> because the stream file is removed when the parent route is finished
> >>>>             FileInputStreamCache streamCache = (FileInputStreamCache)
> >>>> exchange.getIn().getBody();
> >>>>             CachedOutputStream cos = new CachedOutputStream(copy);
> >>>>             try {
> >>>>               IOHelper.copy(streamCache, cos);
> >>>>             } finally {
> >>>>               IOHelper.close(streamCache, cos);
> >>>>               streamCache.reset();
> >>>>             }
> >>>>             copy.getIn().setBody(cos.newStreamCache());
> >>>>         }
> >>>>         // set MEP to InOnly as this wire tap is a fire and forget
> >>>>         copy.setPattern(ExchangePattern.InOnly);
> >>>>         return copy;
> >>>>     }
> >>>>
> >>>> The idea behind this is to make a copy of the stream cache file, so
> >>>> that you get an additional stream cache file for the second route (in
> >>>> your case for the route "direct:x"). This second stream cache file
> >>>> will be deleted when the second route is finished.
> >>>>
> >>>> I also hope that this issue will be fixed. I am no committer so I
> >>>> cannot say when this issue will be solved; I have made contributions
> >>>> which solved a similar problem in the aggregator and splitter.
> >>>>
> >>>> I think you can open a Jira ticket with the above solution suggestion.
> >>>>
> >>>> Regards Franz
> >>>>
> >>>> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden
> >>>> <ge...@aviovision.com> wrote:
> >>>> > Hi Franz,
> >>>> >
> >>>> > is this something that will be fixed in an upcoming release? Is it
> a bug
> >>>> or
> >>>> > does it work as designed?
> >>>> > Can we use a workaround to avoid this behaviour, for example by not
> >>>> > deleting the temp files?
> >>>> >
> >>>> >
> >>>> > Kind regards,
> >>>> >
> >>>> > Geert
> >>>> >
> >>>> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
> >>>> > emc2fpf@googlemail.com> wrote:
> >>>> >
> >>>> >> Hello Geert,
> >>>> >>
> >>>> >> there is no solution yet for your problem. Currently the stream
> cache
> >>>> >> file is removed at the end of the route which created the file. In
> >>>> >> your case the stream cache file is deleted when the "direct:start"
> >>>> >> route is finished. The wire tap runs in a separate thread and
> >>>> >> therefore it can happen that it tries to read the cached file when
> it
> >>>> >> is already deleted, especially when you have a delay in the wiretap
> >>>> >> route ("direct:x").
> >>>> >>
> >>>> >>
> >>>> >> Regards Franz
> >>>> >>
> >>>> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
> >>>> >> <ge...@aviovision.com> wrote:
> >>>> >> > Hi,
> >>>> >> >
> >>>> >> > I noticed a bug where the body (StreamCache) was already removed
> >>>> before
> >>>> >> the
> >>>> >> > exchange reached the end (in the Wiretap route).
> >>>> >> >
> >>>> >> > I found the following ticket
> >>>> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
> >>>> >> >
> >>>> >>
> >>>>
> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
> >>>> >> > but I think it still doesn't fix the Wiretap problem.
> >>>> >> >
> >>>> >> > Here you can find my test (executed on 2.15.1). If you disable
> the
> >>>> >> > StreamCaching or remove the delay it works, enabling it again
> will
> >>>> break
> >>>> >> > the test.
> >>>> >> >
> >>>> >> > ============
> >>>> >> > import org.apache.camel.builder.RouteBuilder;
> >>>> >> > import org.apache.camel.component.mock.MockEndpoint;
> >>>> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
> >>>> >> > import org.apache.camel.spi.StreamCachingStrategy;
> >>>> >> > import org.apache.camel.test.junit4.CamelTestSupport;
> >>>> >> > import org.junit.Before;
> >>>> >> > import org.junit.Test;
> >>>> >> >
> >>>> >> > public class WireTapTest extends CamelTestSupport {
> >>>> >> >
> >>>> >> > private MockEndpoint y;
> >>>> >> > private MockEndpoint z;
> >>>> >> >
> >>>> >> > @Before
> >>>> >> > public void prepareEndpoints() {
> >>>> >> > y = getMockEndpoint("mock:file:y");
> >>>> >> > z = getMockEndpoint("mock:file:z");
> >>>> >> > }
> >>>> >> >
> >>>> >> > @Test
> >>>> >> > public void
> >>>> >> >
> >>>> >>
> >>>>
> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
> >>>> >> > throws InterruptedException {
> >>>> >> > y.expectedMessageCount(1);
> >>>> >> > z.expectedMessageCount(1);
> >>>> >> >
> >>>> >> > // test.txt should contain more than one character
> >>>> >> > template.sendBody("direct:start",
> >>>> >> > this.getClass().getResourceAsStream("/test.txt"));
> >>>> >> >
> >>>> >> > assertMockEndpointsSatisfied();
> >>>> >> > }
> >>>> >> >
> >>>> >> > @Override
> >>>> >> > protected RouteBuilder createRouteBuilder() throws Exception {
> >>>> >> > return new RouteBuilder() {
> >>>> >> > @Override
> >>>> >> > public void configure() throws Exception {
> >>>> >> > StreamCachingStrategy streamCachingStrategy = new
> >>>> >> > DefaultStreamCachingStrategy();
> >>>> >> > streamCachingStrategy.setSpoolThreshold(1);
> >>>> >> > context.setStreamCachingStrategy(streamCachingStrategy);
> >>>> >> > context.setStreamCaching(true);
> >>>> >> >
> >>>> >> > from("direct:start")
> >>>> >> > .wireTap("direct:x")
> >>>> >> > .to("file:y");
> >>>> >> >
> >>>> >> > from("direct:x")
> >>>> >> > .delay(2000)
> >>>> >> > .to("file:z");
> >>>> >> > }
> >>>> >> > };
> >>>> >> > }
> >>>> >> >
> >>>> >> > @Override
> >>>> >> > public String isMockEndpoints() {
> >>>> >> > return "(file:z|file:y)";
> >>>> >> > }
> >>>> >> > }
> >>>> >> > =============
> >>>> >> >
> >>>> >> > If you run the test you can clearly see the temp file deletion
> >>>> followed
> >>>> >> by
> >>>> >> > the closed stream exception:
> >>>> >> >
> >>>> >> > Tried 1 to delete file:
> >>>> >> >
> >>>> >>
> >>>>
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> >>>> >> > with result: true
> >>>> >> >
> >>>> >> > Cannot reset stream from file
> >>>> >> >
> >>>> >>
> >>>>
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> >>>> >> >
> >>>> >> > I encountered the same issue during a more complex route that
> does
> >>>> some
> >>>> >> > splitting (zip file) and multicasting. This occurred on Camel
> 2.14.1
> >>>> so
> >>>> >> it
> >>>> >> > could be fixed by
> https://issues.apache.org/jira/browse/CAMEL-8284
> >>>> but I
> >>>> >> > need to test this.
> >>>> >> >
> >>>> >> > Kind regards,
> >>>> >> >
> >>>> >> > Geert
> >>>> >>
> >>>>
> >>
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> Red Hat, Inc.
> >> Email: cibsen@redhat.com
> >> Twitter: davsclaus
> >> Blog: http://davsclaus.com
> >> Author of Camel in Action: http://www.manning.com/ibsen
> >> hawtio: http://hawt.io/
> >> fabric8: http://fabric8.io/
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> hawtio: http://hawt.io/
> fabric8: http://fabric8.io/
>

Re: Closed (removed) StreamCache when doing a Wiretap

Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Apr 22, 2015 at 10:17 AM, Franz Paul Forsthofer
<em...@googlemail.com> wrote:
> Hello Claus and Henryk,
>
> my original proposal to copy the stream cache file is not the optimal
> solution. A better solution would be to have only one stream cache
> file and to delete this file only when all exchanges which need this
> file are done. This does mean we have to register listeners to the
> event onDone of all UnitOfWorks of the relevant exchanges in the
> stream cache file object and only when all listeners have got the
> onDone event, then the file can be deleted.  However this will require
> quite some changes
>
> Probably we could also use this solution for the agregator and splitter case..
>

Yeah though that entails the stream cache needs to keep this state and
be tailored to not only 1 exchange but to potentially N+ exchanges.

But a good idea nevertheless. Especially for big streams.

Logging a JIRA ticket is welcome.




> Regards Franz
>
> On Wed, Apr 22, 2015 at 9:47 AM, Claus Ibsen <cl...@gmail.com> wrote:
>> Hi
>>
>> Yeah we should likely have a StreamCacheHelper or introduce a
>> copy(boolean clone) method that takes a boolean (with a better name)
>> that indicate it does a indpendenent copy. Then we can keep the inner
>> details how this copy does in those stream cache implementations.
>>
>> The wire tap already does a copy of the stream cache today. But it
>> likely need that clone copy. We could also make that the default.
>> Though I think multicast eip does a copy as well but it may reuse the
>> same underlying file, and only delete it when last exchange is done
>> and closes it.
>>
>>
>>
>> On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <he...@gmail.com> wrote:
>>> Hi,
>>>
>>> You can also use Wiretap's onPrepareRef option and use custom processor to
>>> copy the content of the cached body.
>>>
>>> Franz, would you be so kind and create a pull request with your fix?
>>> Somebody will review it and merge. Thanks in advance!
>>>
>>> Cheers!
>>>
>>> wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer <
>>> emc2fpf@googlemail.com> napisał:
>>>
>>>> Hi Geert,
>>>>
>>>> it is a bug. You can try as a workaround to set the threshold
>>>> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge
>>>> number; then the body will be kept in memory.
>>>>
>>>> Alternatively, you can modify the code of the Camel class
>>>> org.apache.camel.processor.WireTapProcessor. You have to modifiy the
>>>> method configureCopyExchange in the following way:
>>>>
>>>>
>>>>    private Exchange configureCopyExchange(Exchange exchange) throws
>>>> IOException {
>>>>
>>>>
>>>>         // must use a copy as we dont want it to cause side effects of
>>>> the original exchange
>>>>         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
>>>> false);
>>>>
>>>>         if (copy.getIn().getBody() instanceof FileInputStreamCache) {
>>>>             //the file stream must be copied, otherwise you get errors
>>>> because the stream file is removed when the parent route is finished
>>>>             FileInputStreamCache streamCache = (FileInputStreamCache)
>>>> exchange.getIn().getBody();
>>>>             CachedOutputStream cos = new CachedOutputStream(copy);
>>>>             try {
>>>>               IOHelper.copy(streamCache, cos);
>>>>             } finally {
>>>>               IOHelper.close(streamCache, cos);
>>>>               streamCache.reset();
>>>>             }
>>>>             copy.getIn().setBody(cos.newStreamCache());
>>>>         }
>>>>         // set MEP to InOnly as this wire tap is a fire and forget
>>>>         copy.setPattern(ExchangePattern.InOnly);
>>>>         return copy;
>>>>     }
>>>>
>>>> The idea behind this is to make a copy of the stream cache file, so
>>>> that you get an additional stream cache file for the second route (in
>>>> your case for the route "direct:x"). This second stream cache file
>>>> will be deleted when the second route is finished.
>>>>
>>>> I also hope that this issue will be fixed. I am no committer so I
>>>> cannot say when this issue will be solved; I have made contributions
>>>> which solved a similar problem in the aggregator and splitter.
>>>>
>>>> I think you can open a Jira ticket with the above solution suggestion.
>>>>
>>>> Regards Franz
>>>>
>>>> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden
>>>> <ge...@aviovision.com> wrote:
>>>> > Hi Franz,
>>>> >
>>>> > is this something that will be fixed in an upcoming release? Is it a bug
>>>> or
>>>> > does it work as designed?
>>>> > Can we use a workaround to avoid this behaviour, for example by not
>>>> > deleting the temp files?
>>>> >
>>>> >
>>>> > Kind regards,
>>>> >
>>>> > Geert
>>>> >
>>>> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
>>>> > emc2fpf@googlemail.com> wrote:
>>>> >
>>>> >> Hello Geert,
>>>> >>
>>>> >> there is no solution yet for your problem. Currently the stream cache
>>>> >> file is removed at the end of the route which created the file. In
>>>> >> your case the stream cache file is deleted when the "direct:start"
>>>> >> route is finished. The wire tap runs in a separate thread and
>>>> >> therefore it can happen that it tries to read the cached file when it
>>>> >> is already deleted, especially when you have a delay in the wiretap
>>>> >> route ("direct:x").
>>>> >>
>>>> >>
>>>> >> Regards Franz
>>>> >>
>>>> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
>>>> >> <ge...@aviovision.com> wrote:
>>>> >> > Hi,
>>>> >> >
>>>> >> > I noticed a bug where the body (StreamCache) was already removed
>>>> before
>>>> >> the
>>>> >> > exchange reached the end (in the Wiretap route).
>>>> >> >
>>>> >> > I found the following ticket
>>>> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
>>>> >> >
>>>> >>
>>>> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
>>>> >> > but I think it still doesn't fix the Wiretap problem.
>>>> >> >
>>>> >> > Here you can find my test (executed on 2.15.1). If you disable the
>>>> >> > StreamCaching or remove the delay it works, enabling it again will
>>>> break
>>>> >> > the test.
>>>> >> >
>>>> >> > ============
>>>> >> > import org.apache.camel.builder.RouteBuilder;
>>>> >> > import org.apache.camel.component.mock.MockEndpoint;
>>>> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
>>>> >> > import org.apache.camel.spi.StreamCachingStrategy;
>>>> >> > import org.apache.camel.test.junit4.CamelTestSupport;
>>>> >> > import org.junit.Before;
>>>> >> > import org.junit.Test;
>>>> >> >
>>>> >> > public class WireTapTest extends CamelTestSupport {
>>>> >> >
>>>> >> > private MockEndpoint y;
>>>> >> > private MockEndpoint z;
>>>> >> >
>>>> >> > @Before
>>>> >> > public void prepareEndpoints() {
>>>> >> > y = getMockEndpoint("mock:file:y");
>>>> >> > z = getMockEndpoint("mock:file:z");
>>>> >> > }
>>>> >> >
>>>> >> > @Test
>>>> >> > public void
>>>> >> >
>>>> >>
>>>> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
>>>> >> > throws InterruptedException {
>>>> >> > y.expectedMessageCount(1);
>>>> >> > z.expectedMessageCount(1);
>>>> >> >
>>>> >> > // test.txt should contain more than one character
>>>> >> > template.sendBody("direct:start",
>>>> >> > this.getClass().getResourceAsStream("/test.txt"));
>>>> >> >
>>>> >> > assertMockEndpointsSatisfied();
>>>> >> > }
>>>> >> >
>>>> >> > @Override
>>>> >> > protected RouteBuilder createRouteBuilder() throws Exception {
>>>> >> > return new RouteBuilder() {
>>>> >> > @Override
>>>> >> > public void configure() throws Exception {
>>>> >> > StreamCachingStrategy streamCachingStrategy = new
>>>> >> > DefaultStreamCachingStrategy();
>>>> >> > streamCachingStrategy.setSpoolThreshold(1);
>>>> >> > context.setStreamCachingStrategy(streamCachingStrategy);
>>>> >> > context.setStreamCaching(true);
>>>> >> >
>>>> >> > from("direct:start")
>>>> >> > .wireTap("direct:x")
>>>> >> > .to("file:y");
>>>> >> >
>>>> >> > from("direct:x")
>>>> >> > .delay(2000)
>>>> >> > .to("file:z");
>>>> >> > }
>>>> >> > };
>>>> >> > }
>>>> >> >
>>>> >> > @Override
>>>> >> > public String isMockEndpoints() {
>>>> >> > return "(file:z|file:y)";
>>>> >> > }
>>>> >> > }
>>>> >> > =============
>>>> >> >
>>>> >> > If you run the test you can clearly see the temp file deletion
>>>> followed
>>>> >> by
>>>> >> > the closed stream exception:
>>>> >> >
>>>> >> > Tried 1 to delete file:
>>>> >> >
>>>> >>
>>>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>>>> >> > with result: true
>>>> >> >
>>>> >> > Cannot reset stream from file
>>>> >> >
>>>> >>
>>>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>>>> >> >
>>>> >> > I encountered the same issue during a more complex route that does
>>>> some
>>>> >> > splitting (zip file) and multicasting. This occurred on Camel 2.14.1
>>>> so
>>>> >> it
>>>> >> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284
>>>> but I
>>>> >> > need to test this.
>>>> >> >
>>>> >> > Kind regards,
>>>> >> >
>>>> >> > Geert
>>>> >>
>>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> Red Hat, Inc.
>> Email: cibsen@redhat.com
>> Twitter: davsclaus
>> Blog: http://davsclaus.com
>> Author of Camel in Action: http://www.manning.com/ibsen
>> hawtio: http://hawt.io/
>> fabric8: http://fabric8.io/



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Re: Closed (removed) StreamCache when doing a Wiretap

Posted by Franz Paul Forsthofer <em...@googlemail.com>.
Hello Claus and Henryk,

my original proposal to copy the stream cache file is not the optimal
solution. A better solution would be to have only one stream cache
file and to delete this file only when all exchanges which need this
file are done. This does mean we have to register listeners to the
event onDone of all UnitOfWorks of the relevant exchanges in the
stream cache file object and only when all listeners have got the
onDone event, then the file can be deleted.  However this will require
quite some changes

Probably we could also use this solution for the agregator and splitter case..

Regards Franz

On Wed, Apr 22, 2015 at 9:47 AM, Claus Ibsen <cl...@gmail.com> wrote:
> Hi
>
> Yeah we should likely have a StreamCacheHelper or introduce a
> copy(boolean clone) method that takes a boolean (with a better name)
> that indicate it does a indpendenent copy. Then we can keep the inner
> details how this copy does in those stream cache implementations.
>
> The wire tap already does a copy of the stream cache today. But it
> likely need that clone copy. We could also make that the default.
> Though I think multicast eip does a copy as well but it may reuse the
> same underlying file, and only delete it when last exchange is done
> and closes it.
>
>
>
> On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <he...@gmail.com> wrote:
>> Hi,
>>
>> You can also use Wiretap's onPrepareRef option and use custom processor to
>> copy the content of the cached body.
>>
>> Franz, would you be so kind and create a pull request with your fix?
>> Somebody will review it and merge. Thanks in advance!
>>
>> Cheers!
>>
>> wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer <
>> emc2fpf@googlemail.com> napisał:
>>
>>> Hi Geert,
>>>
>>> it is a bug. You can try as a workaround to set the threshold
>>> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge
>>> number; then the body will be kept in memory.
>>>
>>> Alternatively, you can modify the code of the Camel class
>>> org.apache.camel.processor.WireTapProcessor. You have to modifiy the
>>> method configureCopyExchange in the following way:
>>>
>>>
>>>    private Exchange configureCopyExchange(Exchange exchange) throws
>>> IOException {
>>>
>>>
>>>         // must use a copy as we dont want it to cause side effects of
>>> the original exchange
>>>         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
>>> false);
>>>
>>>         if (copy.getIn().getBody() instanceof FileInputStreamCache) {
>>>             //the file stream must be copied, otherwise you get errors
>>> because the stream file is removed when the parent route is finished
>>>             FileInputStreamCache streamCache = (FileInputStreamCache)
>>> exchange.getIn().getBody();
>>>             CachedOutputStream cos = new CachedOutputStream(copy);
>>>             try {
>>>               IOHelper.copy(streamCache, cos);
>>>             } finally {
>>>               IOHelper.close(streamCache, cos);
>>>               streamCache.reset();
>>>             }
>>>             copy.getIn().setBody(cos.newStreamCache());
>>>         }
>>>         // set MEP to InOnly as this wire tap is a fire and forget
>>>         copy.setPattern(ExchangePattern.InOnly);
>>>         return copy;
>>>     }
>>>
>>> The idea behind this is to make a copy of the stream cache file, so
>>> that you get an additional stream cache file for the second route (in
>>> your case for the route "direct:x"). This second stream cache file
>>> will be deleted when the second route is finished.
>>>
>>> I also hope that this issue will be fixed. I am no committer so I
>>> cannot say when this issue will be solved; I have made contributions
>>> which solved a similar problem in the aggregator and splitter.
>>>
>>> I think you can open a Jira ticket with the above solution suggestion.
>>>
>>> Regards Franz
>>>
>>> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden
>>> <ge...@aviovision.com> wrote:
>>> > Hi Franz,
>>> >
>>> > is this something that will be fixed in an upcoming release? Is it a bug
>>> or
>>> > does it work as designed?
>>> > Can we use a workaround to avoid this behaviour, for example by not
>>> > deleting the temp files?
>>> >
>>> >
>>> > Kind regards,
>>> >
>>> > Geert
>>> >
>>> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
>>> > emc2fpf@googlemail.com> wrote:
>>> >
>>> >> Hello Geert,
>>> >>
>>> >> there is no solution yet for your problem. Currently the stream cache
>>> >> file is removed at the end of the route which created the file. In
>>> >> your case the stream cache file is deleted when the "direct:start"
>>> >> route is finished. The wire tap runs in a separate thread and
>>> >> therefore it can happen that it tries to read the cached file when it
>>> >> is already deleted, especially when you have a delay in the wiretap
>>> >> route ("direct:x").
>>> >>
>>> >>
>>> >> Regards Franz
>>> >>
>>> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
>>> >> <ge...@aviovision.com> wrote:
>>> >> > Hi,
>>> >> >
>>> >> > I noticed a bug where the body (StreamCache) was already removed
>>> before
>>> >> the
>>> >> > exchange reached the end (in the Wiretap route).
>>> >> >
>>> >> > I found the following ticket
>>> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
>>> >> >
>>> >>
>>> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
>>> >> > but I think it still doesn't fix the Wiretap problem.
>>> >> >
>>> >> > Here you can find my test (executed on 2.15.1). If you disable the
>>> >> > StreamCaching or remove the delay it works, enabling it again will
>>> break
>>> >> > the test.
>>> >> >
>>> >> > ============
>>> >> > import org.apache.camel.builder.RouteBuilder;
>>> >> > import org.apache.camel.component.mock.MockEndpoint;
>>> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
>>> >> > import org.apache.camel.spi.StreamCachingStrategy;
>>> >> > import org.apache.camel.test.junit4.CamelTestSupport;
>>> >> > import org.junit.Before;
>>> >> > import org.junit.Test;
>>> >> >
>>> >> > public class WireTapTest extends CamelTestSupport {
>>> >> >
>>> >> > private MockEndpoint y;
>>> >> > private MockEndpoint z;
>>> >> >
>>> >> > @Before
>>> >> > public void prepareEndpoints() {
>>> >> > y = getMockEndpoint("mock:file:y");
>>> >> > z = getMockEndpoint("mock:file:z");
>>> >> > }
>>> >> >
>>> >> > @Test
>>> >> > public void
>>> >> >
>>> >>
>>> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
>>> >> > throws InterruptedException {
>>> >> > y.expectedMessageCount(1);
>>> >> > z.expectedMessageCount(1);
>>> >> >
>>> >> > // test.txt should contain more than one character
>>> >> > template.sendBody("direct:start",
>>> >> > this.getClass().getResourceAsStream("/test.txt"));
>>> >> >
>>> >> > assertMockEndpointsSatisfied();
>>> >> > }
>>> >> >
>>> >> > @Override
>>> >> > protected RouteBuilder createRouteBuilder() throws Exception {
>>> >> > return new RouteBuilder() {
>>> >> > @Override
>>> >> > public void configure() throws Exception {
>>> >> > StreamCachingStrategy streamCachingStrategy = new
>>> >> > DefaultStreamCachingStrategy();
>>> >> > streamCachingStrategy.setSpoolThreshold(1);
>>> >> > context.setStreamCachingStrategy(streamCachingStrategy);
>>> >> > context.setStreamCaching(true);
>>> >> >
>>> >> > from("direct:start")
>>> >> > .wireTap("direct:x")
>>> >> > .to("file:y");
>>> >> >
>>> >> > from("direct:x")
>>> >> > .delay(2000)
>>> >> > .to("file:z");
>>> >> > }
>>> >> > };
>>> >> > }
>>> >> >
>>> >> > @Override
>>> >> > public String isMockEndpoints() {
>>> >> > return "(file:z|file:y)";
>>> >> > }
>>> >> > }
>>> >> > =============
>>> >> >
>>> >> > If you run the test you can clearly see the temp file deletion
>>> followed
>>> >> by
>>> >> > the closed stream exception:
>>> >> >
>>> >> > Tried 1 to delete file:
>>> >> >
>>> >>
>>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>>> >> > with result: true
>>> >> >
>>> >> > Cannot reset stream from file
>>> >> >
>>> >>
>>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>>> >> >
>>> >> > I encountered the same issue during a more complex route that does
>>> some
>>> >> > splitting (zip file) and multicasting. This occurred on Camel 2.14.1
>>> so
>>> >> it
>>> >> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284
>>> but I
>>> >> > need to test this.
>>> >> >
>>> >> > Kind regards,
>>> >> >
>>> >> > Geert
>>> >>
>>>
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> hawtio: http://hawt.io/
> fabric8: http://fabric8.io/

Re: Closed (removed) StreamCache when doing a Wiretap

Posted by Geert Vanheusden <ge...@aviovision.com>.
Hi guys,

Thanks for your help. We currently use the suggested workaround with
onPrepareRef.
Should I still create a ticket in Jira with the test and the suggested fix?

Kind regards,

Geert


On Wed, Apr 22, 2015 at 9:47 AM, Claus Ibsen <cl...@gmail.com> wrote:

> Hi
>
> Yeah we should likely have a StreamCacheHelper or introduce a
> copy(boolean clone) method that takes a boolean (with a better name)
> that indicate it does a indpendenent copy. Then we can keep the inner
> details how this copy does in those stream cache implementations.
>
> The wire tap already does a copy of the stream cache today. But it
> likely need that clone copy. We could also make that the default.
> Though I think multicast eip does a copy as well but it may reuse the
> same underlying file, and only delete it when last exchange is done
> and closes it.
>
>
>
> On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <he...@gmail.com> wrote:
> > Hi,
> >
> > You can also use Wiretap's onPrepareRef option and use custom processor
> to
> > copy the content of the cached body.
> >
> > Franz, would you be so kind and create a pull request with your fix?
> > Somebody will review it and merge. Thanks in advance!
> >
> > Cheers!
> >
> > wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer <
> > emc2fpf@googlemail.com> napisał:
> >
> >> Hi Geert,
> >>
> >> it is a bug. You can try as a workaround to set the threshold
> >> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge
> >> number; then the body will be kept in memory.
> >>
> >> Alternatively, you can modify the code of the Camel class
> >> org.apache.camel.processor.WireTapProcessor. You have to modifiy the
> >> method configureCopyExchange in the following way:
> >>
> >>
> >>    private Exchange configureCopyExchange(Exchange exchange) throws
> >> IOException {
> >>
> >>
> >>         // must use a copy as we dont want it to cause side effects of
> >> the original exchange
> >>         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
> >> false);
> >>
> >>         if (copy.getIn().getBody() instanceof FileInputStreamCache) {
> >>             //the file stream must be copied, otherwise you get errors
> >> because the stream file is removed when the parent route is finished
> >>             FileInputStreamCache streamCache = (FileInputStreamCache)
> >> exchange.getIn().getBody();
> >>             CachedOutputStream cos = new CachedOutputStream(copy);
> >>             try {
> >>               IOHelper.copy(streamCache, cos);
> >>             } finally {
> >>               IOHelper.close(streamCache, cos);
> >>               streamCache.reset();
> >>             }
> >>             copy.getIn().setBody(cos.newStreamCache());
> >>         }
> >>         // set MEP to InOnly as this wire tap is a fire and forget
> >>         copy.setPattern(ExchangePattern.InOnly);
> >>         return copy;
> >>     }
> >>
> >> The idea behind this is to make a copy of the stream cache file, so
> >> that you get an additional stream cache file for the second route (in
> >> your case for the route "direct:x"). This second stream cache file
> >> will be deleted when the second route is finished.
> >>
> >> I also hope that this issue will be fixed. I am no committer so I
> >> cannot say when this issue will be solved; I have made contributions
> >> which solved a similar problem in the aggregator and splitter.
> >>
> >> I think you can open a Jira ticket with the above solution suggestion.
> >>
> >> Regards Franz
> >>
> >> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden
> >> <ge...@aviovision.com> wrote:
> >> > Hi Franz,
> >> >
> >> > is this something that will be fixed in an upcoming release? Is it a
> bug
> >> or
> >> > does it work as designed?
> >> > Can we use a workaround to avoid this behaviour, for example by not
> >> > deleting the temp files?
> >> >
> >> >
> >> > Kind regards,
> >> >
> >> > Geert
> >> >
> >> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
> >> > emc2fpf@googlemail.com> wrote:
> >> >
> >> >> Hello Geert,
> >> >>
> >> >> there is no solution yet for your problem. Currently the stream cache
> >> >> file is removed at the end of the route which created the file. In
> >> >> your case the stream cache file is deleted when the "direct:start"
> >> >> route is finished. The wire tap runs in a separate thread and
> >> >> therefore it can happen that it tries to read the cached file when it
> >> >> is already deleted, especially when you have a delay in the wiretap
> >> >> route ("direct:x").
> >> >>
> >> >>
> >> >> Regards Franz
> >> >>
> >> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
> >> >> <ge...@aviovision.com> wrote:
> >> >> > Hi,
> >> >> >
> >> >> > I noticed a bug where the body (StreamCache) was already removed
> >> before
> >> >> the
> >> >> > exchange reached the end (in the Wiretap route).
> >> >> >
> >> >> > I found the following ticket
> >> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
> >> >> >
> >> >>
> >>
> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
> >> >> > but I think it still doesn't fix the Wiretap problem.
> >> >> >
> >> >> > Here you can find my test (executed on 2.15.1). If you disable the
> >> >> > StreamCaching or remove the delay it works, enabling it again will
> >> break
> >> >> > the test.
> >> >> >
> >> >> > ============
> >> >> > import org.apache.camel.builder.RouteBuilder;
> >> >> > import org.apache.camel.component.mock.MockEndpoint;
> >> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
> >> >> > import org.apache.camel.spi.StreamCachingStrategy;
> >> >> > import org.apache.camel.test.junit4.CamelTestSupport;
> >> >> > import org.junit.Before;
> >> >> > import org.junit.Test;
> >> >> >
> >> >> > public class WireTapTest extends CamelTestSupport {
> >> >> >
> >> >> > private MockEndpoint y;
> >> >> > private MockEndpoint z;
> >> >> >
> >> >> > @Before
> >> >> > public void prepareEndpoints() {
> >> >> > y = getMockEndpoint("mock:file:y");
> >> >> > z = getMockEndpoint("mock:file:z");
> >> >> > }
> >> >> >
> >> >> > @Test
> >> >> > public void
> >> >> >
> >> >>
> >>
> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
> >> >> > throws InterruptedException {
> >> >> > y.expectedMessageCount(1);
> >> >> > z.expectedMessageCount(1);
> >> >> >
> >> >> > // test.txt should contain more than one character
> >> >> > template.sendBody("direct:start",
> >> >> > this.getClass().getResourceAsStream("/test.txt"));
> >> >> >
> >> >> > assertMockEndpointsSatisfied();
> >> >> > }
> >> >> >
> >> >> > @Override
> >> >> > protected RouteBuilder createRouteBuilder() throws Exception {
> >> >> > return new RouteBuilder() {
> >> >> > @Override
> >> >> > public void configure() throws Exception {
> >> >> > StreamCachingStrategy streamCachingStrategy = new
> >> >> > DefaultStreamCachingStrategy();
> >> >> > streamCachingStrategy.setSpoolThreshold(1);
> >> >> > context.setStreamCachingStrategy(streamCachingStrategy);
> >> >> > context.setStreamCaching(true);
> >> >> >
> >> >> > from("direct:start")
> >> >> > .wireTap("direct:x")
> >> >> > .to("file:y");
> >> >> >
> >> >> > from("direct:x")
> >> >> > .delay(2000)
> >> >> > .to("file:z");
> >> >> > }
> >> >> > };
> >> >> > }
> >> >> >
> >> >> > @Override
> >> >> > public String isMockEndpoints() {
> >> >> > return "(file:z|file:y)";
> >> >> > }
> >> >> > }
> >> >> > =============
> >> >> >
> >> >> > If you run the test you can clearly see the temp file deletion
> >> followed
> >> >> by
> >> >> > the closed stream exception:
> >> >> >
> >> >> > Tried 1 to delete file:
> >> >> >
> >> >>
> >>
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> >> >> > with result: true
> >> >> >
> >> >> > Cannot reset stream from file
> >> >> >
> >> >>
> >>
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> >> >> >
> >> >> > I encountered the same issue during a more complex route that does
> >> some
> >> >> > splitting (zip file) and multicasting. This occurred on Camel
> 2.14.1
> >> so
> >> >> it
> >> >> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284
> >> but I
> >> >> > need to test this.
> >> >> >
> >> >> > Kind regards,
> >> >> >
> >> >> > Geert
> >> >>
> >>
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> hawtio: http://hawt.io/
> fabric8: http://fabric8.io/
>

Re: Closed (removed) StreamCache when doing a Wiretap

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Yeah we should likely have a StreamCacheHelper or introduce a
copy(boolean clone) method that takes a boolean (with a better name)
that indicate it does a indpendenent copy. Then we can keep the inner
details how this copy does in those stream cache implementations.

The wire tap already does a copy of the stream cache today. But it
likely need that clone copy. We could also make that the default.
Though I think multicast eip does a copy as well but it may reuse the
same underlying file, and only delete it when last exchange is done
and closes it.



On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <he...@gmail.com> wrote:
> Hi,
>
> You can also use Wiretap's onPrepareRef option and use custom processor to
> copy the content of the cached body.
>
> Franz, would you be so kind and create a pull request with your fix?
> Somebody will review it and merge. Thanks in advance!
>
> Cheers!
>
> wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer <
> emc2fpf@googlemail.com> napisał:
>
>> Hi Geert,
>>
>> it is a bug. You can try as a workaround to set the threshold
>> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge
>> number; then the body will be kept in memory.
>>
>> Alternatively, you can modify the code of the Camel class
>> org.apache.camel.processor.WireTapProcessor. You have to modifiy the
>> method configureCopyExchange in the following way:
>>
>>
>>    private Exchange configureCopyExchange(Exchange exchange) throws
>> IOException {
>>
>>
>>         // must use a copy as we dont want it to cause side effects of
>> the original exchange
>>         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
>> false);
>>
>>         if (copy.getIn().getBody() instanceof FileInputStreamCache) {
>>             //the file stream must be copied, otherwise you get errors
>> because the stream file is removed when the parent route is finished
>>             FileInputStreamCache streamCache = (FileInputStreamCache)
>> exchange.getIn().getBody();
>>             CachedOutputStream cos = new CachedOutputStream(copy);
>>             try {
>>               IOHelper.copy(streamCache, cos);
>>             } finally {
>>               IOHelper.close(streamCache, cos);
>>               streamCache.reset();
>>             }
>>             copy.getIn().setBody(cos.newStreamCache());
>>         }
>>         // set MEP to InOnly as this wire tap is a fire and forget
>>         copy.setPattern(ExchangePattern.InOnly);
>>         return copy;
>>     }
>>
>> The idea behind this is to make a copy of the stream cache file, so
>> that you get an additional stream cache file for the second route (in
>> your case for the route "direct:x"). This second stream cache file
>> will be deleted when the second route is finished.
>>
>> I also hope that this issue will be fixed. I am no committer so I
>> cannot say when this issue will be solved; I have made contributions
>> which solved a similar problem in the aggregator and splitter.
>>
>> I think you can open a Jira ticket with the above solution suggestion.
>>
>> Regards Franz
>>
>> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden
>> <ge...@aviovision.com> wrote:
>> > Hi Franz,
>> >
>> > is this something that will be fixed in an upcoming release? Is it a bug
>> or
>> > does it work as designed?
>> > Can we use a workaround to avoid this behaviour, for example by not
>> > deleting the temp files?
>> >
>> >
>> > Kind regards,
>> >
>> > Geert
>> >
>> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
>> > emc2fpf@googlemail.com> wrote:
>> >
>> >> Hello Geert,
>> >>
>> >> there is no solution yet for your problem. Currently the stream cache
>> >> file is removed at the end of the route which created the file. In
>> >> your case the stream cache file is deleted when the "direct:start"
>> >> route is finished. The wire tap runs in a separate thread and
>> >> therefore it can happen that it tries to read the cached file when it
>> >> is already deleted, especially when you have a delay in the wiretap
>> >> route ("direct:x").
>> >>
>> >>
>> >> Regards Franz
>> >>
>> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
>> >> <ge...@aviovision.com> wrote:
>> >> > Hi,
>> >> >
>> >> > I noticed a bug where the body (StreamCache) was already removed
>> before
>> >> the
>> >> > exchange reached the end (in the Wiretap route).
>> >> >
>> >> > I found the following ticket
>> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
>> >> >
>> >>
>> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
>> >> > but I think it still doesn't fix the Wiretap problem.
>> >> >
>> >> > Here you can find my test (executed on 2.15.1). If you disable the
>> >> > StreamCaching or remove the delay it works, enabling it again will
>> break
>> >> > the test.
>> >> >
>> >> > ============
>> >> > import org.apache.camel.builder.RouteBuilder;
>> >> > import org.apache.camel.component.mock.MockEndpoint;
>> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
>> >> > import org.apache.camel.spi.StreamCachingStrategy;
>> >> > import org.apache.camel.test.junit4.CamelTestSupport;
>> >> > import org.junit.Before;
>> >> > import org.junit.Test;
>> >> >
>> >> > public class WireTapTest extends CamelTestSupport {
>> >> >
>> >> > private MockEndpoint y;
>> >> > private MockEndpoint z;
>> >> >
>> >> > @Before
>> >> > public void prepareEndpoints() {
>> >> > y = getMockEndpoint("mock:file:y");
>> >> > z = getMockEndpoint("mock:file:z");
>> >> > }
>> >> >
>> >> > @Test
>> >> > public void
>> >> >
>> >>
>> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
>> >> > throws InterruptedException {
>> >> > y.expectedMessageCount(1);
>> >> > z.expectedMessageCount(1);
>> >> >
>> >> > // test.txt should contain more than one character
>> >> > template.sendBody("direct:start",
>> >> > this.getClass().getResourceAsStream("/test.txt"));
>> >> >
>> >> > assertMockEndpointsSatisfied();
>> >> > }
>> >> >
>> >> > @Override
>> >> > protected RouteBuilder createRouteBuilder() throws Exception {
>> >> > return new RouteBuilder() {
>> >> > @Override
>> >> > public void configure() throws Exception {
>> >> > StreamCachingStrategy streamCachingStrategy = new
>> >> > DefaultStreamCachingStrategy();
>> >> > streamCachingStrategy.setSpoolThreshold(1);
>> >> > context.setStreamCachingStrategy(streamCachingStrategy);
>> >> > context.setStreamCaching(true);
>> >> >
>> >> > from("direct:start")
>> >> > .wireTap("direct:x")
>> >> > .to("file:y");
>> >> >
>> >> > from("direct:x")
>> >> > .delay(2000)
>> >> > .to("file:z");
>> >> > }
>> >> > };
>> >> > }
>> >> >
>> >> > @Override
>> >> > public String isMockEndpoints() {
>> >> > return "(file:z|file:y)";
>> >> > }
>> >> > }
>> >> > =============
>> >> >
>> >> > If you run the test you can clearly see the temp file deletion
>> followed
>> >> by
>> >> > the closed stream exception:
>> >> >
>> >> > Tried 1 to delete file:
>> >> >
>> >>
>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>> >> > with result: true
>> >> >
>> >> > Cannot reset stream from file
>> >> >
>> >>
>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>> >> >
>> >> > I encountered the same issue during a more complex route that does
>> some
>> >> > splitting (zip file) and multicasting. This occurred on Camel 2.14.1
>> so
>> >> it
>> >> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284
>> but I
>> >> > need to test this.
>> >> >
>> >> > Kind regards,
>> >> >
>> >> > Geert
>> >>
>>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Re: Closed (removed) StreamCache when doing a Wiretap

Posted by Henryk Konsek <he...@gmail.com>.
Hi,

You can also use Wiretap's onPrepareRef option and use custom processor to
copy the content of the cached body.

Franz, would you be so kind and create a pull request with your fix?
Somebody will review it and merge. Thanks in advance!

Cheers!

wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer <
emc2fpf@googlemail.com> napisał:

> Hi Geert,
>
> it is a bug. You can try as a workaround to set the threshold
> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge
> number; then the body will be kept in memory.
>
> Alternatively, you can modify the code of the Camel class
> org.apache.camel.processor.WireTapProcessor. You have to modifiy the
> method configureCopyExchange in the following way:
>
>
>    private Exchange configureCopyExchange(Exchange exchange) throws
> IOException {
>
>
>         // must use a copy as we dont want it to cause side effects of
> the original exchange
>         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
> false);
>
>         if (copy.getIn().getBody() instanceof FileInputStreamCache) {
>             //the file stream must be copied, otherwise you get errors
> because the stream file is removed when the parent route is finished
>             FileInputStreamCache streamCache = (FileInputStreamCache)
> exchange.getIn().getBody();
>             CachedOutputStream cos = new CachedOutputStream(copy);
>             try {
>               IOHelper.copy(streamCache, cos);
>             } finally {
>               IOHelper.close(streamCache, cos);
>               streamCache.reset();
>             }
>             copy.getIn().setBody(cos.newStreamCache());
>         }
>         // set MEP to InOnly as this wire tap is a fire and forget
>         copy.setPattern(ExchangePattern.InOnly);
>         return copy;
>     }
>
> The idea behind this is to make a copy of the stream cache file, so
> that you get an additional stream cache file for the second route (in
> your case for the route "direct:x"). This second stream cache file
> will be deleted when the second route is finished.
>
> I also hope that this issue will be fixed. I am no committer so I
> cannot say when this issue will be solved; I have made contributions
> which solved a similar problem in the aggregator and splitter.
>
> I think you can open a Jira ticket with the above solution suggestion.
>
> Regards Franz
>
> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden
> <ge...@aviovision.com> wrote:
> > Hi Franz,
> >
> > is this something that will be fixed in an upcoming release? Is it a bug
> or
> > does it work as designed?
> > Can we use a workaround to avoid this behaviour, for example by not
> > deleting the temp files?
> >
> >
> > Kind regards,
> >
> > Geert
> >
> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
> > emc2fpf@googlemail.com> wrote:
> >
> >> Hello Geert,
> >>
> >> there is no solution yet for your problem. Currently the stream cache
> >> file is removed at the end of the route which created the file. In
> >> your case the stream cache file is deleted when the "direct:start"
> >> route is finished. The wire tap runs in a separate thread and
> >> therefore it can happen that it tries to read the cached file when it
> >> is already deleted, especially when you have a delay in the wiretap
> >> route ("direct:x").
> >>
> >>
> >> Regards Franz
> >>
> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
> >> <ge...@aviovision.com> wrote:
> >> > Hi,
> >> >
> >> > I noticed a bug where the body (StreamCache) was already removed
> before
> >> the
> >> > exchange reached the end (in the Wiretap route).
> >> >
> >> > I found the following ticket
> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
> >> >
> >>
> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
> >> > but I think it still doesn't fix the Wiretap problem.
> >> >
> >> > Here you can find my test (executed on 2.15.1). If you disable the
> >> > StreamCaching or remove the delay it works, enabling it again will
> break
> >> > the test.
> >> >
> >> > ============
> >> > import org.apache.camel.builder.RouteBuilder;
> >> > import org.apache.camel.component.mock.MockEndpoint;
> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
> >> > import org.apache.camel.spi.StreamCachingStrategy;
> >> > import org.apache.camel.test.junit4.CamelTestSupport;
> >> > import org.junit.Before;
> >> > import org.junit.Test;
> >> >
> >> > public class WireTapTest extends CamelTestSupport {
> >> >
> >> > private MockEndpoint y;
> >> > private MockEndpoint z;
> >> >
> >> > @Before
> >> > public void prepareEndpoints() {
> >> > y = getMockEndpoint("mock:file:y");
> >> > z = getMockEndpoint("mock:file:z");
> >> > }
> >> >
> >> > @Test
> >> > public void
> >> >
> >>
> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
> >> > throws InterruptedException {
> >> > y.expectedMessageCount(1);
> >> > z.expectedMessageCount(1);
> >> >
> >> > // test.txt should contain more than one character
> >> > template.sendBody("direct:start",
> >> > this.getClass().getResourceAsStream("/test.txt"));
> >> >
> >> > assertMockEndpointsSatisfied();
> >> > }
> >> >
> >> > @Override
> >> > protected RouteBuilder createRouteBuilder() throws Exception {
> >> > return new RouteBuilder() {
> >> > @Override
> >> > public void configure() throws Exception {
> >> > StreamCachingStrategy streamCachingStrategy = new
> >> > DefaultStreamCachingStrategy();
> >> > streamCachingStrategy.setSpoolThreshold(1);
> >> > context.setStreamCachingStrategy(streamCachingStrategy);
> >> > context.setStreamCaching(true);
> >> >
> >> > from("direct:start")
> >> > .wireTap("direct:x")
> >> > .to("file:y");
> >> >
> >> > from("direct:x")
> >> > .delay(2000)
> >> > .to("file:z");
> >> > }
> >> > };
> >> > }
> >> >
> >> > @Override
> >> > public String isMockEndpoints() {
> >> > return "(file:z|file:y)";
> >> > }
> >> > }
> >> > =============
> >> >
> >> > If you run the test you can clearly see the temp file deletion
> followed
> >> by
> >> > the closed stream exception:
> >> >
> >> > Tried 1 to delete file:
> >> >
> >>
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> >> > with result: true
> >> >
> >> > Cannot reset stream from file
> >> >
> >>
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> >> >
> >> > I encountered the same issue during a more complex route that does
> some
> >> > splitting (zip file) and multicasting. This occurred on Camel 2.14.1
> so
> >> it
> >> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284
> but I
> >> > need to test this.
> >> >
> >> > Kind regards,
> >> >
> >> > Geert
> >>
>

Re: Closed (removed) StreamCache when doing a Wiretap

Posted by Franz Paul Forsthofer <em...@googlemail.com>.
Hi Geert,

it is a bug. You can try as a workaround to set the threshold
(streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge
number; then the body will be kept in memory.

Alternatively, you can modify the code of the Camel class
org.apache.camel.processor.WireTapProcessor. You have to modifiy the
method configureCopyExchange in the following way:


   private Exchange configureCopyExchange(Exchange exchange) throws
IOException {


        // must use a copy as we dont want it to cause side effects of
the original exchange
        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);

        if (copy.getIn().getBody() instanceof FileInputStreamCache) {
            //the file stream must be copied, otherwise you get errors
because the stream file is removed when the parent route is finished
            FileInputStreamCache streamCache = (FileInputStreamCache)
exchange.getIn().getBody();
            CachedOutputStream cos = new CachedOutputStream(copy);
            try {
              IOHelper.copy(streamCache, cos);
            } finally {
              IOHelper.close(streamCache, cos);
              streamCache.reset();
            }
            copy.getIn().setBody(cos.newStreamCache());
        }
        // set MEP to InOnly as this wire tap is a fire and forget
        copy.setPattern(ExchangePattern.InOnly);
        return copy;
    }

The idea behind this is to make a copy of the stream cache file, so
that you get an additional stream cache file for the second route (in
your case for the route "direct:x"). This second stream cache file
will be deleted when the second route is finished.

I also hope that this issue will be fixed. I am no committer so I
cannot say when this issue will be solved; I have made contributions
which solved a similar problem in the aggregator and splitter.

I think you can open a Jira ticket with the above solution suggestion.

Regards Franz

On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden
<ge...@aviovision.com> wrote:
> Hi Franz,
>
> is this something that will be fixed in an upcoming release? Is it a bug or
> does it work as designed?
> Can we use a workaround to avoid this behaviour, for example by not
> deleting the temp files?
>
>
> Kind regards,
>
> Geert
>
> On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
> emc2fpf@googlemail.com> wrote:
>
>> Hello Geert,
>>
>> there is no solution yet for your problem. Currently the stream cache
>> file is removed at the end of the route which created the file. In
>> your case the stream cache file is deleted when the "direct:start"
>> route is finished. The wire tap runs in a separate thread and
>> therefore it can happen that it tries to read the cached file when it
>> is already deleted, especially when you have a delay in the wiretap
>> route ("direct:x").
>>
>>
>> Regards Franz
>>
>> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
>> <ge...@aviovision.com> wrote:
>> > Hi,
>> >
>> > I noticed a bug where the body (StreamCache) was already removed before
>> the
>> > exchange reached the end (in the Wiretap route).
>> >
>> > I found the following ticket
>> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
>> >
>> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
>> > but I think it still doesn't fix the Wiretap problem.
>> >
>> > Here you can find my test (executed on 2.15.1). If you disable the
>> > StreamCaching or remove the delay it works, enabling it again will break
>> > the test.
>> >
>> > ============
>> > import org.apache.camel.builder.RouteBuilder;
>> > import org.apache.camel.component.mock.MockEndpoint;
>> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
>> > import org.apache.camel.spi.StreamCachingStrategy;
>> > import org.apache.camel.test.junit4.CamelTestSupport;
>> > import org.junit.Before;
>> > import org.junit.Test;
>> >
>> > public class WireTapTest extends CamelTestSupport {
>> >
>> > private MockEndpoint y;
>> > private MockEndpoint z;
>> >
>> > @Before
>> > public void prepareEndpoints() {
>> > y = getMockEndpoint("mock:file:y");
>> > z = getMockEndpoint("mock:file:z");
>> > }
>> >
>> > @Test
>> > public void
>> >
>> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
>> > throws InterruptedException {
>> > y.expectedMessageCount(1);
>> > z.expectedMessageCount(1);
>> >
>> > // test.txt should contain more than one character
>> > template.sendBody("direct:start",
>> > this.getClass().getResourceAsStream("/test.txt"));
>> >
>> > assertMockEndpointsSatisfied();
>> > }
>> >
>> > @Override
>> > protected RouteBuilder createRouteBuilder() throws Exception {
>> > return new RouteBuilder() {
>> > @Override
>> > public void configure() throws Exception {
>> > StreamCachingStrategy streamCachingStrategy = new
>> > DefaultStreamCachingStrategy();
>> > streamCachingStrategy.setSpoolThreshold(1);
>> > context.setStreamCachingStrategy(streamCachingStrategy);
>> > context.setStreamCaching(true);
>> >
>> > from("direct:start")
>> > .wireTap("direct:x")
>> > .to("file:y");
>> >
>> > from("direct:x")
>> > .delay(2000)
>> > .to("file:z");
>> > }
>> > };
>> > }
>> >
>> > @Override
>> > public String isMockEndpoints() {
>> > return "(file:z|file:y)";
>> > }
>> > }
>> > =============
>> >
>> > If you run the test you can clearly see the temp file deletion followed
>> by
>> > the closed stream exception:
>> >
>> > Tried 1 to delete file:
>> >
>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>> > with result: true
>> >
>> > Cannot reset stream from file
>> >
>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>> >
>> > I encountered the same issue during a more complex route that does some
>> > splitting (zip file) and multicasting. This occurred on Camel 2.14.1 so
>> it
>> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284 but I
>> > need to test this.
>> >
>> > Kind regards,
>> >
>> > Geert
>>

Re: Closed (removed) StreamCache when doing a Wiretap

Posted by Geert Vanheusden <ge...@aviovision.com>.
Hi Franz,

is this something that will be fixed in an upcoming release? Is it a bug or
does it work as designed?
Can we use a workaround to avoid this behaviour, for example by not
deleting the temp files?


Kind regards,

Geert

On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
emc2fpf@googlemail.com> wrote:

> Hello Geert,
>
> there is no solution yet for your problem. Currently the stream cache
> file is removed at the end of the route which created the file. In
> your case the stream cache file is deleted when the "direct:start"
> route is finished. The wire tap runs in a separate thread and
> therefore it can happen that it tries to read the cached file when it
> is already deleted, especially when you have a delay in the wiretap
> route ("direct:x").
>
>
> Regards Franz
>
> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
> <ge...@aviovision.com> wrote:
> > Hi,
> >
> > I noticed a bug where the body (StreamCache) was already removed before
> the
> > exchange reached the end (in the Wiretap route).
> >
> > I found the following ticket
> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
> >
> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
> > but I think it still doesn't fix the Wiretap problem.
> >
> > Here you can find my test (executed on 2.15.1). If you disable the
> > StreamCaching or remove the delay it works, enabling it again will break
> > the test.
> >
> > ============
> > import org.apache.camel.builder.RouteBuilder;
> > import org.apache.camel.component.mock.MockEndpoint;
> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
> > import org.apache.camel.spi.StreamCachingStrategy;
> > import org.apache.camel.test.junit4.CamelTestSupport;
> > import org.junit.Before;
> > import org.junit.Test;
> >
> > public class WireTapTest extends CamelTestSupport {
> >
> > private MockEndpoint y;
> > private MockEndpoint z;
> >
> > @Before
> > public void prepareEndpoints() {
> > y = getMockEndpoint("mock:file:y");
> > z = getMockEndpoint("mock:file:z");
> > }
> >
> > @Test
> > public void
> >
> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
> > throws InterruptedException {
> > y.expectedMessageCount(1);
> > z.expectedMessageCount(1);
> >
> > // test.txt should contain more than one character
> > template.sendBody("direct:start",
> > this.getClass().getResourceAsStream("/test.txt"));
> >
> > assertMockEndpointsSatisfied();
> > }
> >
> > @Override
> > protected RouteBuilder createRouteBuilder() throws Exception {
> > return new RouteBuilder() {
> > @Override
> > public void configure() throws Exception {
> > StreamCachingStrategy streamCachingStrategy = new
> > DefaultStreamCachingStrategy();
> > streamCachingStrategy.setSpoolThreshold(1);
> > context.setStreamCachingStrategy(streamCachingStrategy);
> > context.setStreamCaching(true);
> >
> > from("direct:start")
> > .wireTap("direct:x")
> > .to("file:y");
> >
> > from("direct:x")
> > .delay(2000)
> > .to("file:z");
> > }
> > };
> > }
> >
> > @Override
> > public String isMockEndpoints() {
> > return "(file:z|file:y)";
> > }
> > }
> > =============
> >
> > If you run the test you can clearly see the temp file deletion followed
> by
> > the closed stream exception:
> >
> > Tried 1 to delete file:
> >
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> > with result: true
> >
> > Cannot reset stream from file
> >
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> >
> > I encountered the same issue during a more complex route that does some
> > splitting (zip file) and multicasting. This occurred on Camel 2.14.1 so
> it
> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284 but I
> > need to test this.
> >
> > Kind regards,
> >
> > Geert
>

Re: Closed (removed) StreamCache when doing a Wiretap

Posted by Franz Paul Forsthofer <em...@googlemail.com>.
Hello Geert,

there is no solution yet for your problem. Currently the stream cache
file is removed at the end of the route which created the file. In
your case the stream cache file is deleted when the "direct:start"
route is finished. The wire tap runs in a separate thread and
therefore it can happen that it tries to read the cached file when it
is already deleted, especially when you have a delay in the wiretap
route ("direct:x").


Regards Franz

On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
<ge...@aviovision.com> wrote:
> Hi,
>
> I noticed a bug where the body (StreamCache) was already removed before the
> exchange reached the end (in the Wiretap route).
>
> I found the following ticket
> https://issues.apache.org/jira/browse/CAMEL-8386 and code
> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
> but I think it still doesn't fix the Wiretap problem.
>
> Here you can find my test (executed on 2.15.1). If you disable the
> StreamCaching or remove the delay it works, enabling it again will break
> the test.
>
> ============
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.DefaultStreamCachingStrategy;
> import org.apache.camel.spi.StreamCachingStrategy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Before;
> import org.junit.Test;
>
> public class WireTapTest extends CamelTestSupport {
>
> private MockEndpoint y;
> private MockEndpoint z;
>
> @Before
> public void prepareEndpoints() {
> y = getMockEndpoint("mock:file:y");
> z = getMockEndpoint("mock:file:z");
> }
>
> @Test
> public void
> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
> throws InterruptedException {
> y.expectedMessageCount(1);
> z.expectedMessageCount(1);
>
> // test.txt should contain more than one character
> template.sendBody("direct:start",
> this.getClass().getResourceAsStream("/test.txt"));
>
> assertMockEndpointsSatisfied();
> }
>
> @Override
> protected RouteBuilder createRouteBuilder() throws Exception {
> return new RouteBuilder() {
> @Override
> public void configure() throws Exception {
> StreamCachingStrategy streamCachingStrategy = new
> DefaultStreamCachingStrategy();
> streamCachingStrategy.setSpoolThreshold(1);
> context.setStreamCachingStrategy(streamCachingStrategy);
> context.setStreamCaching(true);
>
> from("direct:start")
> .wireTap("direct:x")
> .to("file:y");
>
> from("direct:x")
> .delay(2000)
> .to("file:z");
> }
> };
> }
>
> @Override
> public String isMockEndpoints() {
> return "(file:z|file:y)";
> }
> }
> =============
>
> If you run the test you can clearly see the temp file deletion followed by
> the closed stream exception:
>
> Tried 1 to delete file:
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
> with result: true
>
> Cannot reset stream from file
> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>
> I encountered the same issue during a more complex route that does some
> splitting (zip file) and multicasting. This occurred on Camel 2.14.1 so it
> could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284 but I
> need to test this.
>
> Kind regards,
>
> Geert