You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/11/27 07:49:39 UTC

[camel] branch master updated (9f7a8aa -> ca7183a)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 9f7a8aa  Upgrade Quickfixj to version 2.0.0
     new 21807de  CAMEL-12020: Added polling consumer for file/ftp components that poll on-demand and do not use a background scheduler.
     new c2e63a1  Skip test that fails with port number in use problem
     new ca7183a  Regen

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/component/file/FileEndpoint.java  |  19 ++
 .../camel/component/file/GenericFileConsumer.java  |  19 +-
 .../component/file/GenericFilePollingConsumer.java | 229 +++++++++++++++++++++
 .../camel/impl/EventDrivenPollingConsumer.java     |  10 +-
 .../camel/component/test/TestFileSplitTest.java    |   2 +-
 .../camel/impl/ConsumerCacheZeroCapacityTest.java  |   2 +-
 .../enricher/PollEnrichBridgeErrorHandlerTest.java |   3 +-
 .../camel/component/file/remote/FtpEndpoint.java   |   2 +-
 .../component/file/remote/RemoteFileEndpoint.java  |  16 ++
 .../camel/component/file/remote/SftpEndpoint.java  |   1 -
 ...lEnrichConsumeWithDisconnectAndDeleteTest.java} |  11 +-
 .../SpringJettyNoConnectionRedeliveryTest.java     |   2 +
 .../camel-rss/src/main/docs/rss-component.adoc     |   4 +-
 13 files changed, 305 insertions(+), 15 deletions(-)
 create mode 100644 camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
 copy components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/{SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java => FtpPollEnrichConsumeWithDisconnectAndDeleteTest.java} (81%)

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].

[camel] 03/03: Regen

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ca7183ad1486decd9a8d64dffb95e9bfb5c69a15
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Nov 27 08:47:04 2017 +0100

    Regen
---
 components/camel-rss/src/main/docs/rss-component.adoc | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/components/camel-rss/src/main/docs/rss-component.adoc b/components/camel-rss/src/main/docs/rss-component.adoc
index da58055..bd3055c 100644
--- a/components/camel-rss/src/main/docs/rss-component.adoc
+++ b/components/camel-rss/src/main/docs/rss-component.adoc
@@ -1,4 +1,4 @@
-ls== RSS Component
+== RSS Component
 
 *Available as of Camel version 2.0*
 
@@ -173,4 +173,4 @@ return firstEntry.getTitle().contains("Camel");
 * link:endpoint.html[Endpoint]
 * link:getting-started.html[Getting Started]
 
-* link:atom.html[Atom]
+* link:atom.html[Atom]
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.

[camel] 02/03: Skip test that fails with port number in use problem

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c2e63a1c3d2f07de25d0dd3079930f64327067ad
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Nov 26 19:38:27 2017 +0100

    Skip test that fails with port number in use problem
---
 .../camel/component/jetty/SpringJettyNoConnectionRedeliveryTest.java    | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/SpringJettyNoConnectionRedeliveryTest.java b/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/SpringJettyNoConnectionRedeliveryTest.java
index 3b26744..d0f8b2f 100644
--- a/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/SpringJettyNoConnectionRedeliveryTest.java
+++ b/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/SpringJettyNoConnectionRedeliveryTest.java
@@ -21,6 +21,7 @@ import java.net.ConnectException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.springframework.context.support.AbstractXmlApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
@@ -28,6 +29,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
 /**
  * @version
  */
+@Ignore("Fails with Address already in use")
 public class SpringJettyNoConnectionRedeliveryTest extends CamelSpringTestSupport {
 
     @Override

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.

[camel] 01/03: CAMEL-12020: Added polling consumer for file/ftp components that poll on-demand and do not use a background scheduler.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 21807defcfcbc40feafd80de68c5e0601a893ff8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Nov 26 19:37:26 2017 +0100

    CAMEL-12020: Added polling consumer for file/ftp components that poll on-demand and do not use a background scheduler.
---
 .../apache/camel/component/file/FileEndpoint.java  |  19 ++
 .../camel/component/file/GenericFileConsumer.java  |  19 +-
 .../component/file/GenericFilePollingConsumer.java | 229 +++++++++++++++++++++
 .../camel/impl/EventDrivenPollingConsumer.java     |  10 +-
 .../camel/component/test/TestFileSplitTest.java    |   2 +-
 .../camel/impl/ConsumerCacheZeroCapacityTest.java  |   2 +-
 .../enricher/PollEnrichBridgeErrorHandlerTest.java |   3 +-
 .../camel/component/file/remote/FtpEndpoint.java   |   2 +-
 .../component/file/remote/RemoteFileEndpoint.java  |  16 ++
 .../camel/component/file/remote/SftpEndpoint.java  |   1 -
 ...llEnrichConsumeWithDisconnectAndDeleteTest.java |  77 +++++++
 11 files changed, 373 insertions(+), 7 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
index 4b61d6c..9bfdc41 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
@@ -26,7 +26,9 @@ import java.util.Set;
 import org.apache.camel.Component;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.EventDrivenPollingConsumer;
 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
@@ -119,6 +121,23 @@ public class FileEndpoint extends GenericFileEndpoint<File> {
         return result;
     }
 
+    @Override
+    public PollingConsumer createPollingConsumer() throws Exception {
+        ObjectHelper.notNull(operations, "operations");
+        ObjectHelper.notNull(file, "file");
+
+        if (log.isDebugEnabled()) {
+            log.debug("Creating GenericFilePollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}",
+                getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout());
+        }
+        GenericFilePollingConsumer result = new GenericFilePollingConsumer(this);
+        // should not call configurePollingConsumer when its GenericFilePollingConsumer
+        result.setBlockWhenFull(isPollingConsumerBlockWhenFull());
+        result.setBlockTimeout(getPollingConsumerBlockTimeout());
+
+        return result;
+    }
+
     public GenericFileProducer<File> createProducer() throws Exception {
         ObjectHelper.notNull(operations, "operations");
 
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index b6d7d67..619bf6b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -93,7 +93,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
     /**
      * Poll for files
      */
-    protected int poll() throws Exception {
+    public int poll() throws Exception {
         // must prepare on startup the very first time
         if (!prepareOnStartup) {
             // prepare on startup
@@ -720,4 +720,21 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
         prepareOnStartup = false;
         super.doStop();
     }
+
+    @Override
+    public void onInit() throws Exception {
+        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
+    }
+
+    @Override
+    public long beforePoll(long timeout) throws Exception {
+        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
+        return timeout;
+    }
+
+    @Override
+    public void afterPoll() throws Exception {
+        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
+    }
+
 }
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
new file mode 100644
index 0000000..09f6bcf
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.EventDrivenPollingConsumer;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.spi.PollingConsumerPollStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericFilePollingConsumer extends EventDrivenPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GenericFilePollingConsumer.class);
+
+    private final long delay;
+
+    public GenericFilePollingConsumer(GenericFileEndpoint endpoint) throws Exception {
+        super(endpoint);
+        this.delay = endpoint.getDelay();
+    }
+
+    @Override
+    protected Consumer createConsumer() throws Exception {
+        // lets add ourselves as a consumer
+        GenericFileConsumer consumer = (GenericFileConsumer) super.createConsumer();
+        // do not start scheduler as we poll manually
+        consumer.setStartScheduler(false);
+        consumer.setMaxMessagesPerPoll(1);
+        // we only want to poll once so disconnect by default
+        return consumer;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        // ensure consumer is started
+        ServiceHelper.startService(getConsumer());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+    }
+
+    @Override
+    protected GenericFileConsumer getConsumer() {
+        return (GenericFileConsumer) super.getConsumer();
+    }
+
+    @Override
+    public Exchange receiveNoWait() {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("receiveNoWait polling file: {}", getConsumer().getEndpoint());
+        }
+        int polled = doReceive(0);
+        if (polled > 0) {
+            return super.receive(0);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Exchange receive() {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("receive polling file: {}", getConsumer().getEndpoint());
+        }
+        int polled = doReceive(Long.MAX_VALUE);
+        if (polled > 0) {
+            return super.receive();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Exchange receive(long timeout) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("receive({}) polling file: {}", timeout, getConsumer().getEndpoint());
+        }
+        int polled = doReceive(timeout);
+        if (polled > 0) {
+            return super.receive(timeout);
+        } else {
+            return null;
+        }
+    }
+
+    protected int doReceive(long timeout) {
+        int retryCounter = -1;
+        boolean done = false;
+        Throwable cause = null;
+        int polledMessages = 0;
+        PollingConsumerPollStrategy pollStrategy = getConsumer().getPollStrategy();
+        boolean sendEmptyMessageWhenIdle = getConsumer() instanceof ScheduledBatchPollingConsumer && getConsumer().isSendEmptyMessageWhenIdle();
+        StopWatch watch = new StopWatch();
+
+        while (!done) {
+            try {
+                cause = null;
+                // eager assume we are done
+                done = true;
+                if (isRunAllowed()) {
+
+                    if (retryCounter == -1) {
+                        LOG.trace("Starting to poll: {}", this.getEndpoint());
+                    } else {
+                        LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
+                    }
+
+                    // mark we are polling which should also include the begin/poll/commit
+                    boolean begin = pollStrategy.begin(this, getEndpoint());
+                    if (begin) {
+                        retryCounter++;
+                        polledMessages = getConsumer().poll();
+                        LOG.trace("Polled {} messages", polledMessages);
+
+                        if (polledMessages == 0 && sendEmptyMessageWhenIdle) {
+                            // send an "empty" exchange
+                            processEmptyMessage();
+                        } else if (polledMessages == 0 && timeout > 0) {
+                            // if we did not poll a file and we are using timeout then try to poll again
+                            done = false;
+                        }
+
+                        pollStrategy.commit(this, getEndpoint(), polledMessages);
+                    } else {
+                        LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
+                    }
+                }
+
+                LOG.trace("Finished polling: {}", this.getEndpoint());
+            } catch (Exception e) {
+                try {
+                    boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e);
+                    if (retry) {
+                        // do not set cause as we retry
+                        done = false;
+                    } else {
+                        cause = e;
+                        done = true;
+                    }
+                } catch (Throwable t) {
+                    cause = t;
+                    done = true;
+                }
+            } catch (Throwable t) {
+                cause = t;
+                done = true;
+            }
+
+            if (!done && timeout > 0) {
+                // prepare for next attempt until we hit timeout
+                long left = timeout - watch.taken();
+                long min = Math.min(left, delay);
+                if (min > 0) {
+                    try {
+                        // sleep for next pool
+                        sleep(min);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                } else {
+                    // timeout hit
+                    done = true;
+                }
+            }
+        }
+
+        if (cause != null) {
+            throw ObjectHelper.wrapRuntimeCamelException(cause);
+        }
+
+        return polledMessages;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        Object name = exchange.getIn().getHeader(Exchange.FILE_NAME);
+        if (name != null) {
+            LOG.debug("Received file: {}", name);
+        }
+        super.process(exchange);
+    }
+
+    /**
+     * No messages to poll so send an empty message instead.
+     *
+     * @throws Exception is thrown if error processing the empty message.
+     */
+    protected void processEmptyMessage() throws Exception {
+        Exchange exchange = getEndpoint().createExchange();
+        log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint());
+        process(exchange);
+    }
+
+    private static void sleep(long delay) throws InterruptedException {
+        if (delay <= 0) {
+            return;
+        }
+        LOG.trace("Sleeping for: {} millis", delay);
+        Thread.sleep(delay);
+    }
+
+}
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index df00e0d..066abf4 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -211,9 +211,17 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
         }
     }
 
+    protected Consumer getConsumer() {
+        return consumer;
+    }
+
+    protected Consumer createConsumer() throws Exception {
+        return getEndpoint().createConsumer(this);
+    }
+
     protected void doStart() throws Exception {
         // lets add ourselves as a consumer
-        consumer = getEndpoint().createConsumer(this);
+        consumer = createConsumer();
 
         // if the consumer has a polling strategy then invoke that
         if (consumer instanceof PollingConsumerPollingStrategy) {
diff --git a/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java b/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java
index 648e571..1732b0f 100644
--- a/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java
@@ -41,7 +41,7 @@ public class TestFileSplitTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                        .to("test:file:target/testme?noop=true&split=true");
+                        .to("test:file:target/testme?noop=true&split=true&timeout=1000");
             }
         });
         context.start();
diff --git a/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java b/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
index 30f38a8..31cd3c1 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
@@ -45,7 +45,7 @@ public class ConsumerCacheZeroCapacityTest extends ContextTestSupport {
         consumer.receive(50);
 
         boolean found = Thread.getAllStackTraces().keySet().stream().anyMatch(t -> t.getName().contains("target/foo"));
-        assertTrue("Should find file consumer thread", found);
+        assertFalse("Should not find file consumer thread", found);
 
         cache.releasePollingConsumer(endpoint, consumer);
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java
index 4d6d287..3b417c5 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java
@@ -50,7 +50,8 @@ public class PollEnrichBridgeErrorHandlerTest extends ContextTestSupport {
 
         Exception caught = getMockEndpoint("mock:dead").getExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
         assertNotNull(caught);
-        assertEquals("Something went wrong", caught.getMessage());
+        assertTrue(caught.getMessage().startsWith("Error during poll"));
+        assertEquals("Something went wrong", caught.getCause().getCause().getMessage());
     }
 
     @Override
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
index 45406f8..846500e 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
@@ -83,7 +83,7 @@ public class FtpEndpoint<T extends FTPFile> extends RemoteFileEndpoint<FTPFile>
             throw new FailedToCreateProducerException(this, e);
         }
     }
-    
+
     public RemoteFileOperations<FTPFile> createRemoteFileOperations() throws Exception {
         // configure ftp client
         FTPClient client = ftpClient;
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
index 884678a..a037afd 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
@@ -19,10 +19,12 @@ package org.apache.camel.component.file.remote;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileEndpoint;
 import org.apache.camel.component.file.GenericFileExist;
+import org.apache.camel.component.file.GenericFilePollingConsumer;
 import org.apache.camel.component.file.GenericFileProducer;
 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.spi.UriParam;
@@ -128,6 +130,20 @@ public abstract class RemoteFileEndpoint<T> extends GenericFileEndpoint<T> {
         return consumer;
     }
 
+    @Override
+    public PollingConsumer createPollingConsumer() throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("Creating GenericFilePollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}",
+                getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout());
+        }
+        GenericFilePollingConsumer result = new GenericFilePollingConsumer(this);
+        // should not call configurePollingConsumer when its GenericFilePollingConsumer
+        result.setBlockWhenFull(isPollingConsumerBlockWhenFull());
+        result.setBlockTimeout(getPollingConsumerBlockTimeout());
+
+        return result;
+    }
+
     /**
      * Validates this endpoint if its configured properly.
      *
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
index 8bfeb4f..42341bc 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.file.remote;
 
-import com.jcraft.jsch.ChannelSftp;
 import com.jcraft.jsch.Proxy;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFileConfiguration;
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollEnrichConsumeWithDisconnectAndDeleteTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollEnrichConsumeWithDisconnectAndDeleteTest.java
new file mode 100644
index 0000000..258cb66
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollEnrichConsumeWithDisconnectAndDeleteTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class FtpPollEnrichConsumeWithDisconnectAndDeleteTest extends FtpServerTestSupport {
+
+    @Test
+    public void testFtpSimpleConsume() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        String expected = "Hello World";
+
+        // create file using regular file
+        template.sendBodyAndHeader("file://" + FTP_ROOT_DIR + "/poll", expected, Exchange.FILE_NAME, "hello.txt");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
+        mock.expectedBodiesReceived(expected);
+
+        ProducerTemplate triggerTemplate = context.createProducerTemplate();
+        triggerTemplate.sendBody("vm:trigger", "");
+
+        assertMockEndpointsSatisfied();
+
+        long startFileDeletionCheckTime = System.currentTimeMillis();
+        boolean fileExists = true;
+        while (System.currentTimeMillis() - startFileDeletionCheckTime < 3000) {  // wait up to 3000ms for file to be deleted
+            File file = new File(FTP_ROOT_DIR + "/poll/hello.txt");
+            fileExists = file.exists();
+
+            if (fileExists) {
+                log.info("Will check that file has been deleted again in 200ms");
+                Thread.sleep(200);
+            }
+        }
+
+        assertFalse("The file should have been deleted", fileExists);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("vm:trigger")
+                    .pollEnrich("ftp://admin@localhost:" + getPort() + "/poll?password=admin&delete=true")
+                    .routeId("foo")
+                    .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.