You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Krzysztof Szafrański (JIRA)" <ji...@apache.org> on 2018/02/08 14:29:00 UTC

[jira] [Updated] (CAMEL-12244) RemoteFileProducer stopped instead of being released to the pool when "interceptSendToEndpoint" is used

     [ https://issues.apache.org/jira/browse/CAMEL-12244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Krzysztof Szafrański updated CAMEL-12244:
-----------------------------------------
    Description: 
In our application we're using an SFTP producer with "fileExist=Move" and a specific "moveExisting" expression. I encountered a problem where this would sometimes work, and sometimes not (i.e. there would be no ".archived" file). Upon further investigation I found the problem and it seems to be a bug in Camel.

Our SFTP endpoint looks like this:
{code:none}
sftp://...:.../...?username=...&privateKeyPassphrase=...&privateKeyFile=...&useUserKnownHostsFile=false&jschLoggingLevel=ERROR&fileExist=Move&moveExisting=${file:name}.archived${date:now:yyyyMMddHHmmssSSS}
{code}

We also have an interceptor:
{code:none}
route.interceptSendToEndpoint("sftp://.*").process(exchange -> LOG.info("Sending file {} to {}", ...));
{code}

As I discovered, using the interceptor wraps the RemoteFileProducer with InterceptSendToEndpoint. This however changes the behavior of the ProducerCache:
{code}
public boolean doInAsyncProducer(...) {
    ...
    return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> {
        ...
        if (producer instanceof ServicePoolAware) {
            // release back to the pool
            pool.release(endpoint, producer);
        } else if (!producer.isSingleton()) {
            // stop and shutdown non-singleton producers as we should not leak resources
            try {
                ServiceHelper.stopAndShutdownService(producer);
            } catch (Exception e) {
                ...
            }
        }
        ...
    });
    ...
}
{code}

RemoteFileProducer implements ServicePoolAware so it would normally go back to the pool, but InterceptSendToEndpoint _does not_. As a result, our producers keep getting stopped (note that RemoteFileProducer#isSingleton always returns false).

What's more, somehow they _are_ being reused and in the end we run into situations, where one thread is closing a producer, while another thread is trying to write with it.

I set up some breakpoints that log the thread name and System#identityHashCode of the producer:
{code}
2018-02-08 15:05:25.070 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Starting producer: RemoteFileProducer[...]
2018-02-08 15:05:25.073 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Processing file: [my_file] for exchange: ...
2018-02-08 15:05:25.073 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Not already connected/logged in. Connecting to: ...
doStop(), time: 1518098725112,  thread [Camel (camel-1) thread #35 - CamelInvocationHandler], producer: 889747012
	at org.apache.camel.component.file.remote.RemoteFileProducer.doStop(RemoteFileProducer.java:175)
	at org.apache.camel.support.ServiceSupport.stop(ServiceSupport.java:102)
	at org.apache.camel.util.ServiceHelper.stopService(ServiceHelper.java:142)
	at org.apache.camel.impl.InterceptSendToEndpoint$1.stop(InterceptSendToEndpoint.java:196)
	at org.apache.camel.support.ServiceSupport.shutdown(ServiceSupport.java:164)
	at org.apache.camel.util.ServiceHelper.stopAndShutdownService(ServiceHelper.java:211)
	at org.apache.camel.impl.ProducerCache.lambda$doInAsyncProducer$2(ProducerCache.java:450)
	at org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:178)
	at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:171)
	at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173)
	at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695)
	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623)
	at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247)
	at org.apache.camel.processor.Splitter.process(Splitter.java:114)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
	at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72)
	at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Connected and logged in to: ...
2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Disconnecting from: ...
2018-02-08 15:05:25.973 TRACE o.a.c.c.file.remote.RemoteFileProducer     : About to write [my_file] to [...] from exchange [...]
2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Stopping producer: RemoteFileProducer[...]
2018-02-08 15:05:25.974 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Starting
2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Starting producer: RemoteFileProducer[...]
2018-02-08 15:05:25.977 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Processing file: [another_file] for exchange: Exchange[...]
2018-02-08 15:05:25.977 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Not already connected/logged in. Connecting to: ...
handleFailedWrite(), time: 1518098726072, thread [Camel (camel-1) thread #37 - CamelInvocationHandler], producer: 889747012
	at org.apache.camel.component.file.remote.RemoteFileProducer.handleFailedWrite(RemoteFileProducer.java:81)
	at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:227)
	at org.apache.camel.component.file.remote.RemoteFileProducer.process(RemoteFileProducer.java:58)
	at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:167)
	at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173)
	at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695)
	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623)
	at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247)
	at org.apache.camel.processor.Splitter.process(Splitter.java:114)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
	at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72)
	at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot change directory to: [my_directory]
	at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:596)
	at org.apache.camel.component.file.remote.SftpOperations.changeCurrentDirectory(SftpOperations.java:584)
	at org.apache.camel.component.file.remote.SftpOperations.storeFile(SftpOperations.java:830)
	at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
	at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
	... 39 more
Caused by: 4:
	at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:359)
	at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:594)
	... 43 more
Caused by: java.io.IOException: Pipe closed
	at java.io.PipedInputStream.read(PipedInputStream.java:307)
	at com.jcraft.jsch.Channel$MyPipedInputStream.updateReadSide(Channel.java:362)
	at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:337)
	... 44 more
2018-02-08 15:05:26.186 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Exception occurred during stopping: Cannot change directory to: [my_directory]
{code}
So thread #35 stopped the producer, while thread #37 was trying to use it.

One more ugly thing about it, is that when SftpOperations fail due to a closed pipe, by the time it gets to RemoteFileProducer#handleFailedWrite:
{code}
public void handleFailedWrite(...) throws Exception {
    ...
    if (isStopping() || isStopped()) {
        // if we are stopping then ignore any exception during a poll
        log.debug("Exception occurred during stopping: " + exception.getMessage());
    } else {
        log.warn("Writing file failed with: " + exception.getMessage());
        ...
        throw exception;
    }
}
{code}
the producer is already stopped, *so the exception is logged on DEBUG and not rethrown*.

Note that I'm writing multiple files in parallel (three in this case), I'm using this to send data to the route ending in the SFTP endpoint:
{code}
@Produce(uri = "direct:myDir")
private MyDir myDir;
...
myDir.sendAsync(...)
{code}
where
{code}
public interface MyDir {
    Future<?> sendAsync(...);
}
{code}

We're using Camel 2.19.0, but so far that I've looked at the github repository, the issue is most likely present in the current version too.

  was:
In our application we're using an SFTP producer with "fileExist=Move" and a specific "moveExisting" expression. I encountered a problem where this would sometimes work, and sometimes not (i.e. there would be no ".archived" file). Upon further investigation I found the problem and it seems to be a bug in Camel.

Our SFTP endpoint looks like this:
{code:none}
sftp://...:.../...?username=...&privateKeyPassphrase=...&privateKeyFile=...&useUserKnownHostsFile=false&jschLoggingLevel=ERROR&fileExist=Move&moveExisting=${file:name}.archived${date:now:yyyyMMddHHmmssSSS}
{code}

We also have an interceptor:
{code:none}
route.interceptSendToEndpoint("sftp://.*").process(exchange -> LOG.info("Sending file {} to {}", ...));
{code}

As I discovered, using the interceptor wraps the RemoteFileProducer with InterceptSendToEndpoint. This however changes the behavior of the ProducerCache:
{code}
public boolean doInAsyncProducer(...) {
    ...
    return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> {
        ...
        if (producer instanceof ServicePoolAware) {
            // release back to the pool
            pool.release(endpoint, producer);
        } else if (!producer.isSingleton()) {
            // stop and shutdown non-singleton producers as we should not leak resources
            try {
                ServiceHelper.stopAndShutdownService(producer);
            } catch (Exception e) {
                ...
            }
        }
        ...
    });
    ...
}
{code}

RemoteFileProducer implements ServicePoolAware so it would normally go back to the pool, but InterceptSendToEndpoint _does not_. As a result, our producers keep getting stopped.

What's more, somehow they _are_ being reused and in the end we run into situations, where one thread is closing a producer, while another thread is trying to write with it.

I set up some breakpoints that log the thread name and System#identityHashCode of the producer:
{code}
2018-02-08 15:05:25.070 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Starting producer: RemoteFileProducer[...]
2018-02-08 15:05:25.073 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Processing file: [my_file] for exchange: ...
2018-02-08 15:05:25.073 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Not already connected/logged in. Connecting to: ...
doStop(), time: 1518098725112,  thread [Camel (camel-1) thread #35 - CamelInvocationHandler], producer: 889747012
	at org.apache.camel.component.file.remote.RemoteFileProducer.doStop(RemoteFileProducer.java:175)
	at org.apache.camel.support.ServiceSupport.stop(ServiceSupport.java:102)
	at org.apache.camel.util.ServiceHelper.stopService(ServiceHelper.java:142)
	at org.apache.camel.impl.InterceptSendToEndpoint$1.stop(InterceptSendToEndpoint.java:196)
	at org.apache.camel.support.ServiceSupport.shutdown(ServiceSupport.java:164)
	at org.apache.camel.util.ServiceHelper.stopAndShutdownService(ServiceHelper.java:211)
	at org.apache.camel.impl.ProducerCache.lambda$doInAsyncProducer$2(ProducerCache.java:450)
	at org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:178)
	at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:171)
	at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173)
	at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695)
	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623)
	at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247)
	at org.apache.camel.processor.Splitter.process(Splitter.java:114)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
	at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72)
	at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Connected and logged in to: ...
2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Disconnecting from: ...
2018-02-08 15:05:25.973 TRACE o.a.c.c.file.remote.RemoteFileProducer     : About to write [my_file] to [...] from exchange [...]
2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Stopping producer: RemoteFileProducer[...]
2018-02-08 15:05:25.974 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Starting
2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Starting producer: RemoteFileProducer[...]
2018-02-08 15:05:25.977 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Processing file: [another_file] for exchange: Exchange[...]
2018-02-08 15:05:25.977 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Not already connected/logged in. Connecting to: ...
handleFailedWrite(), time: 1518098726072, thread [Camel (camel-1) thread #37 - CamelInvocationHandler], producer: 889747012
	at org.apache.camel.component.file.remote.RemoteFileProducer.handleFailedWrite(RemoteFileProducer.java:81)
	at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:227)
	at org.apache.camel.component.file.remote.RemoteFileProducer.process(RemoteFileProducer.java:58)
	at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:167)
	at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173)
	at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695)
	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623)
	at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247)
	at org.apache.camel.processor.Splitter.process(Splitter.java:114)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
	at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72)
	at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot change directory to: [my_directory]
	at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:596)
	at org.apache.camel.component.file.remote.SftpOperations.changeCurrentDirectory(SftpOperations.java:584)
	at org.apache.camel.component.file.remote.SftpOperations.storeFile(SftpOperations.java:830)
	at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
	at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
	... 39 more
Caused by: 4:
	at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:359)
	at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:594)
	... 43 more
Caused by: java.io.IOException: Pipe closed
	at java.io.PipedInputStream.read(PipedInputStream.java:307)
	at com.jcraft.jsch.Channel$MyPipedInputStream.updateReadSide(Channel.java:362)
	at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:337)
	... 44 more
2018-02-08 15:05:26.186 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Exception occurred during stopping: Cannot change directory to: [my_directory]
{code}
So thread #35 stopped the producer, while thread #37 was trying to use it.

One more ugly thing about it, is that when SftpOperations fail due to a closed pipe, by the time it gets to RemoteFileProducer#handleFailedWrite:
{code}
public void handleFailedWrite(...) throws Exception {
    ...
    if (isStopping() || isStopped()) {
        // if we are stopping then ignore any exception during a poll
        log.debug("Exception occurred during stopping: " + exception.getMessage());
    } else {
        log.warn("Writing file failed with: " + exception.getMessage());
        ...
        throw exception;
    }
}
{code}
the producer is already stopped, *so the exception is logged on DEBUG and not rethrown*.

Note that I'm writing multiple files in parallel (three in this case), I'm using this to send data to the route ending in the SFTP endpoint:
{code}
@Produce(uri = "direct:myDir")
private MyDir myDir;
...
myDir.sendAsync(...)
{code}
where
{code}
public interface MyDir {
    Future<?> sendAsync(...);
}
{code}

We're using Camel 2.19.0, but so far that I've looked at the github repository, the issue is most likely present in the current version too.


> RemoteFileProducer stopped instead of being released to the pool when "interceptSendToEndpoint" is used
> -------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-12244
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12244
>             Project: Camel
>          Issue Type: Bug
>    Affects Versions: 2.19.0
>            Reporter: Krzysztof Szafrański
>            Priority: Major
>
> In our application we're using an SFTP producer with "fileExist=Move" and a specific "moveExisting" expression. I encountered a problem where this would sometimes work, and sometimes not (i.e. there would be no ".archived" file). Upon further investigation I found the problem and it seems to be a bug in Camel.
> Our SFTP endpoint looks like this:
> {code:none}
> sftp://...:.../...?username=...&privateKeyPassphrase=...&privateKeyFile=...&useUserKnownHostsFile=false&jschLoggingLevel=ERROR&fileExist=Move&moveExisting=${file:name}.archived${date:now:yyyyMMddHHmmssSSS}
> {code}
> We also have an interceptor:
> {code:none}
> route.interceptSendToEndpoint("sftp://.*").process(exchange -> LOG.info("Sending file {} to {}", ...));
> {code}
> As I discovered, using the interceptor wraps the RemoteFileProducer with InterceptSendToEndpoint. This however changes the behavior of the ProducerCache:
> {code}
> public boolean doInAsyncProducer(...) {
>     ...
>     return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> {
>         ...
>         if (producer instanceof ServicePoolAware) {
>             // release back to the pool
>             pool.release(endpoint, producer);
>         } else if (!producer.isSingleton()) {
>             // stop and shutdown non-singleton producers as we should not leak resources
>             try {
>                 ServiceHelper.stopAndShutdownService(producer);
>             } catch (Exception e) {
>                 ...
>             }
>         }
>         ...
>     });
>     ...
> }
> {code}
> RemoteFileProducer implements ServicePoolAware so it would normally go back to the pool, but InterceptSendToEndpoint _does not_. As a result, our producers keep getting stopped (note that RemoteFileProducer#isSingleton always returns false).
> What's more, somehow they _are_ being reused and in the end we run into situations, where one thread is closing a producer, while another thread is trying to write with it.
> I set up some breakpoints that log the thread name and System#identityHashCode of the producer:
> {code}
> 2018-02-08 15:05:25.070 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Starting producer: RemoteFileProducer[...]
> 2018-02-08 15:05:25.073 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Processing file: [my_file] for exchange: ...
> 2018-02-08 15:05:25.073 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Not already connected/logged in. Connecting to: ...
> doStop(), time: 1518098725112,  thread [Camel (camel-1) thread #35 - CamelInvocationHandler], producer: 889747012
> 	at org.apache.camel.component.file.remote.RemoteFileProducer.doStop(RemoteFileProducer.java:175)
> 	at org.apache.camel.support.ServiceSupport.stop(ServiceSupport.java:102)
> 	at org.apache.camel.util.ServiceHelper.stopService(ServiceHelper.java:142)
> 	at org.apache.camel.impl.InterceptSendToEndpoint$1.stop(InterceptSendToEndpoint.java:196)
> 	at org.apache.camel.support.ServiceSupport.shutdown(ServiceSupport.java:164)
> 	at org.apache.camel.util.ServiceHelper.stopAndShutdownService(ServiceHelper.java:211)
> 	at org.apache.camel.impl.ProducerCache.lambda$doInAsyncProducer$2(ProducerCache.java:450)
> 	at org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:178)
> 	at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:171)
> 	at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173)
> 	at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
> 	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168)
> 	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
> 	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
> 	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
> 	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
> 	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> 	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695)
> 	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623)
> 	at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247)
> 	at org.apache.camel.processor.Splitter.process(Splitter.java:114)
> 	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
> 	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
> 	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> 	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
> 	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
> 	at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72)
> 	at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192)
> 	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java)
> 	at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
> 	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Connected and logged in to: ...
> 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Disconnecting from: ...
> 2018-02-08 15:05:25.973 TRACE o.a.c.c.file.remote.RemoteFileProducer     : About to write [my_file] to [...] from exchange [...]
> 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Stopping producer: RemoteFileProducer[...]
> 2018-02-08 15:05:25.974 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Starting
> 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Starting producer: RemoteFileProducer[...]
> 2018-02-08 15:05:25.977 TRACE o.a.c.c.file.remote.RemoteFileProducer     : Processing file: [another_file] for exchange: Exchange[...]
> 2018-02-08 15:05:25.977 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Not already connected/logged in. Connecting to: ...
> handleFailedWrite(), time: 1518098726072, thread [Camel (camel-1) thread #37 - CamelInvocationHandler], producer: 889747012
> 	at org.apache.camel.component.file.remote.RemoteFileProducer.handleFailedWrite(RemoteFileProducer.java:81)
> 	at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:227)
> 	at org.apache.camel.component.file.remote.RemoteFileProducer.process(RemoteFileProducer.java:58)
> 	at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:167)
> 	at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173)
> 	at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
> 	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168)
> 	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
> 	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
> 	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
> 	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
> 	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> 	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695)
> 	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623)
> 	at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247)
> 	at org.apache.camel.processor.Splitter.process(Splitter.java:114)
> 	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
> 	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
> 	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> 	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
> 	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
> 	at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72)
> 	at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192)
> 	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java)
> 	at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
> 	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot change directory to: [my_directory]
> 	at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:596)
> 	at org.apache.camel.component.file.remote.SftpOperations.changeCurrentDirectory(SftpOperations.java:584)
> 	at org.apache.camel.component.file.remote.SftpOperations.storeFile(SftpOperations.java:830)
> 	at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
> 	at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
> 	... 39 more
> Caused by: 4:
> 	at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:359)
> 	at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:594)
> 	... 43 more
> Caused by: java.io.IOException: Pipe closed
> 	at java.io.PipedInputStream.read(PipedInputStream.java:307)
> 	at com.jcraft.jsch.Channel$MyPipedInputStream.updateReadSide(Channel.java:362)
> 	at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:337)
> 	... 44 more
> 2018-02-08 15:05:26.186 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : Exception occurred during stopping: Cannot change directory to: [my_directory]
> {code}
> So thread #35 stopped the producer, while thread #37 was trying to use it.
> One more ugly thing about it, is that when SftpOperations fail due to a closed pipe, by the time it gets to RemoteFileProducer#handleFailedWrite:
> {code}
> public void handleFailedWrite(...) throws Exception {
>     ...
>     if (isStopping() || isStopped()) {
>         // if we are stopping then ignore any exception during a poll
>         log.debug("Exception occurred during stopping: " + exception.getMessage());
>     } else {
>         log.warn("Writing file failed with: " + exception.getMessage());
>         ...
>         throw exception;
>     }
> }
> {code}
> the producer is already stopped, *so the exception is logged on DEBUG and not rethrown*.
> Note that I'm writing multiple files in parallel (three in this case), I'm using this to send data to the route ending in the SFTP endpoint:
> {code}
> @Produce(uri = "direct:myDir")
> private MyDir myDir;
> ...
> myDir.sendAsync(...)
> {code}
> where
> {code}
> public interface MyDir {
>     Future<?> sendAsync(...);
> }
> {code}
> We're using Camel 2.19.0, but so far that I've looked at the github repository, the issue is most likely present in the current version too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)