You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/09/05 13:00:58 UTC

[1/2] git commit: CAMEL-6671: FTP consumer. Added useList and ignoreFileNotFound options to allow to download a single file without using FTP LIST command which is needed in some use-cases, such as user has no permission to do FTP LIST.

Updated Branches:
  refs/heads/camel-2.12.x b56b1c56d -> c948c2b8d
  refs/heads/master f4efabae3 -> d99d5940d


CAMEL-6671: FTP consumer. Added useList and ignoreFileNotFound options to allow to download a single file without using FTP LIST command which is needed in some use-cases, such as user has no permission to do FTP LIST.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d99d5940
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d99d5940
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d99d5940

Branch: refs/heads/master
Commit: d99d5940dd1a3d0a48ebd3382b57d2cfdb422008
Parents: f4efaba
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Sep 5 12:44:16 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Sep 5 12:49:26 2013 +0200

----------------------------------------------------------------------
 .../component/file/GenericFileConsumer.java     | 64 +++++++++++++----
 .../camel/impl/ScheduledPollConsumer.java       |  1 +
 .../component/file/remote/FtpConsumer.java      | 40 +++++++++--
 .../file/remote/RemoteFileConfiguration.java    | 35 ++++++++-
 .../file/remote/RemoteFileConsumer.java         |  4 +-
 .../component/file/remote/SftpConsumer.java     |  1 +
 .../file/remote/FromFtpUseListFalseTest.java    | 75 ++++++++++++++++++++
 .../FtpConsumerTemplateUseListFalseTest.java    | 74 +++++++++++++++++++
 8 files changed, 271 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 0f8be8e..c55755e 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -171,6 +171,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
 
     public int processBatch(Queue<Object> exchanges) {
         int total = exchanges.size();
+        int answer = total;
 
         // limit if needed
         if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
@@ -191,19 +192,25 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
             pendingExchanges = total - index - 1;
 
             // process the current exchange
+            boolean started;
             if (customProcessor != null) {
                 // use a custom processor
-                customProcessExchange(exchange, customProcessor);
+                started = customProcessExchange(exchange, customProcessor);
             } else {
                 // process the exchange regular
-                processExchange(exchange);
+                started = processExchange(exchange);
+            }
+
+            // if we did not start process the file then decremember the counter
+            if (!started) {
+                answer--;
             }
         }
 
         // drain any in progress files as we are done with this batch
         removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0);
 
-        return total;
+        return answer;
     }
 
     protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
@@ -277,11 +284,28 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
     }
 
     /**
+     * Whether to ignore if the file cannot be retrieved.
+     * <p/>
+     * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved.
+     * <p/>
+     * This method allows to suppress this and just ignore that.
+     *
+     * @param name        the file name
+     * @param exchange    the exchange
+     * @return <tt>true</tt> to ignore, <tt>false</tt> is the default.
+     */
+    protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange) {
+        return false;
+    }
+
+    /**
      * Processes the exchange
      *
      * @param exchange the exchange
+     * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started
+     * to be processed, for some reason (not found, or aborted etc)
      */
-    protected void processExchange(final Exchange exchange) {
+    protected boolean processExchange(final Exchange exchange) {
         GenericFile<T> file = getExchangeFileProperty(exchange);
         log.trace("Processing file: {}", file);
 
@@ -303,7 +327,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
                     // begin returned false, so remove file from the in progress list as its no longer in progress
                     endpoint.getInProgressRepository().remove(absoluteFileName);
                 }
-                return;
+                return false;
             }
         } catch (Exception e) {
             // remove file from the in progress list due to failure
@@ -311,7 +335,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
 
             String msg = endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage();
             handleException(msg, e);
-            return;
+            return false;
         }
 
         // must use file from exchange as it can be updated due the
@@ -328,10 +352,17 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
                 // retrieve the file and check it was a success
                 boolean retrieved = operations.retrieveFile(name, exchange);
                 if (!retrieved) {
-                    // throw exception to handle the problem with retrieving the file
-                    // then if the method return false or throws an exception is handled the same in here
-                    // as in both cases an exception is being thrown
-                    throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
+                    if (ignoreCannotRetrieveFile(name, exchange)) {
+                        log.trace("Cannot retrieve file {} maybe it does not exists. Ignorning.", name);
+                        // remove file from the in progress list as we could not retrieve it, but should ignore
+                        endpoint.getInProgressRepository().remove(absoluteFileName);
+                        return false;
+                    } else {
+                        // throw exception to handle the problem with retrieving the file
+                        // then if the method return false or throws an exception is handled the same in here
+                        // as in both cases an exception is being thrown
+                        throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
+                    }
                 }
     
                 log.trace("Retrieved file: {} from: {}", name, endpoint);                
@@ -368,6 +399,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
             String msg = "Error processing file " + file + " due to " + e.getMessage();
             handleException(msg, e);
         }
+
+        return true;
     }
 
     /**
@@ -385,7 +418,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
      * @param exchange the exchange
      * @param processor the custom processor
      */
-    protected void customProcessExchange(final Exchange exchange, final Processor processor) {
+    protected boolean customProcessExchange(final Exchange exchange, final Processor processor) {
         GenericFile<T> file = getExchangeFileProperty(exchange);
         log.trace("Custom processing file: {}", file);
 
@@ -407,6 +440,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
             // as the name can be different when using preMove option
             endpoint.getInProgressRepository().remove(absoluteFileName);
         }
+
+        return true;
     }
 
     /**
@@ -508,7 +543,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
 
         // use file expression for a simple dynamic file filter
         if (endpoint.getFileName() != null) {
-            evaluateFileExpression();
+            fileExpressionResult = evaluateFileExpression();
             if (fileExpressionResult != null) {
                 if (!name.equals(fileExpressionResult)) {
                     return false;
@@ -557,12 +592,13 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
         return !endpoint.getInProgressRepository().add(key);
     }
 
-    private void evaluateFileExpression() {
-        if (fileExpressionResult == null) {
+    protected String evaluateFileExpression() {
+        if (fileExpressionResult == null && endpoint.getFileName() != null) {
             // create a dummy exchange as Exchange is needed for expression evaluation
             Exchange dummy = endpoint.createExchange();
             fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
         }
+        return fileExpressionResult;
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
index 6b34ac7..98b9e42 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
@@ -185,6 +185,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
                         if (begin) {
                             retryCounter++;
                             polledMessages = poll();
+                            LOG.trace("Polled {} messages", polledMessages);
 
                             if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
                                 // send an "empty" exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
index 7b042ea..5d7dca5 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
@@ -16,8 +16,10 @@
  */
 package org.apache.camel.component.file.remote;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.util.FileUtil;
@@ -83,11 +85,24 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
         }
 
         log.trace("Polling directory: {}", dir);
-        List<FTPFile> files;
-        if (isStepwise()) {
-            files = operations.listFiles();
+        List<FTPFile> files = null;
+        if (isUseList()) {
+            if (isStepwise()) {
+                files = operations.listFiles();
+            } else {
+                files = operations.listFiles(dir);
+            }
         } else {
-            files = operations.listFiles(dir);
+            // we cannot use the LIST command(s) so we can only poll a named file
+            // so created a pseudo file with that name
+            FTPFile file = new FTPFile();
+            file.setType(FTPFile.FILE_TYPE);
+            fileExpressionResult = evaluateFileExpression();
+            if (fileExpressionResult != null) {
+                file.setName(fileExpressionResult);
+                files = new ArrayList<FTPFile>(1);
+                files.add(file);
+            }
         }
 
         if (files == null || files.isEmpty()) {
@@ -149,6 +164,18 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
         return false;
     }
 
+    @Override
+    protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange) {
+        if (getEndpoint().getConfiguration().isIgnoreFileNotFound()) {
+            // error code 550 is file not found
+            int code = exchange.getIn().getHeader(FtpConstants.FTP_REPLY_CODE, 0, int.class);
+            if (code == 550) {
+                return true;
+            }
+        }
+        return super.ignoreCannotRetrieveFile(name, exchange);
+    }
+
     private RemoteFile<FTPFile> asRemoteFile(String absolutePath, FTPFile file) {
         RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>();
 
@@ -193,6 +220,11 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
         return config.isStepwise();
     }
 
+    private boolean isUseList() {
+        RemoteFileConfiguration config = (RemoteFileConfiguration) endpoint.getConfiguration();
+        return config.isUseList();
+    }
+
     @Override
     public String toString() {
         return "FtpConsumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";

http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
index e5325ba..bbe0a18 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
@@ -33,7 +33,7 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
      * Windows = Path separator \ is used
      * Auto = Use existing path separator in file name
      */
-    public enum PathSeparator { UNIX, Windows, Auto };
+    public enum PathSeparator { UNIX, Windows, Auto }
 
     private String protocol;
     private String username;
@@ -50,6 +50,8 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
     private boolean stepwise = true;
     private PathSeparator separator = PathSeparator.Auto;
     private boolean streamDownload;
+    private boolean useList = true;
+    private boolean ignoreFileNotFound;
 
     public RemoteFileConfiguration() {
     }
@@ -270,13 +272,40 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
      * Sets the download method to use when not using a local working directory.  If set to true,
      * the remote files are streamed to the route as they are read.  When set to false, the remote files
      * are loaded into memory before being sent into the route.
-     *
-     * @param streamDownload 
      */
     public void setStreamDownload(boolean streamDownload) {
         this.streamDownload = streamDownload;
     }
 
+    public boolean isUseList() {
+        return useList;
+    }
+
+    /**
+     * Whether to allow using LIST command when downloading a file.
+     * <p/>
+     * Default is <tt>true</tt>. In some use cases you may want to download
+     * a specific file and are not allowed to use the LIST command, and therefore
+     * you can set this option to <tt>false</tt>.
+     */
+    public void setUseList(boolean useList) {
+        this.useList = useList;
+    }
+
+    public boolean isIgnoreFileNotFound() {
+        return ignoreFileNotFound;
+    }
+
+    /**
+     * Whether to ignore when trying to download a file which does not exist.
+     * <p/>
+     * By default when a file does not exists, then an exception is thrown.
+     * Setting this option to <tt>true</tt> allows to ignore that instead.
+     */
+    public void setIgnoreFileNotFound(boolean ignoreFileNotFound) {
+        this.ignoreFileNotFound = ignoreFileNotFound;
+    }
+
     /**
      * Normalizes the given path according to the configured path separator.
      *

http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index 39e23ce..de242da 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -92,11 +92,11 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
     }
 
     @Override
-    protected void processExchange(Exchange exchange) {
+    protected boolean processExchange(Exchange exchange) {
         // mark the exchange to be processed synchronously as the ftp client is not thread safe
         // and we must execute the callbacks in the same thread as this consumer
         exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE);
-        super.processExchange(exchange);
+        return super.processExchange(exchange);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
index 1dbfd9d..765aac4 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.file.remote;
 import java.util.List;
 
 import com.jcraft.jsch.ChannelSftp;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.util.FileUtil;

http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
new file mode 100644
index 0000000..afd2763
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test to poll a fixed file from the FTP server without using the list command.
+ */
+public class FromFtpUseListFalseTest extends FtpServerTestSupport {
+
+    private String getFtpUrl() {
+        return "ftp://admin@localhost:" + getPort() + "/nolist/?password=admin"
+                + "&stepwise=false&useList=false&ignoreFileNotFound=true&fileName=report.txt&delete=true";
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        prepareFtpServer();
+    }
+
+    @Test
+    public void testUseListFalse() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World from FTPServer");
+
+        // just allow to poll a few more times, but we should only get the file once
+        Thread.sleep(1000);
+
+        mock.assertIsSatisfied();
+    }
+    
+    private void prepareFtpServer() throws Exception {
+        // prepares the FTP Server by creating a file on the server that we want to unit
+        // test that we can pool and store as a local file
+        Endpoint endpoint = context.getEndpoint("ftp://admin@localhost:" + getPort() + "/nolist/?password=admin&binary=false");
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody("Hello World from FTPServer");
+        exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt");
+        Producer producer = endpoint.createProducer();
+        producer.start();
+        producer.process(exchange);
+        producer.stop();
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(getFtpUrl()).to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
new file mode 100644
index 0000000..6554e62
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test to poll a fixed file from the FTP server without using the list command.
+ */
+public class FtpConsumerTemplateUseListFalseTest extends FtpServerTestSupport {
+
+    private String getFtpUrl() {
+        return "ftp://admin@localhost:" + getPort() + "/nolist/?password=admin"
+                + "&stepwise=false&useList=false&ignoreFileNotFound=true&delete=true";
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        prepareFtpServer();
+    }
+
+    @Test
+    public void testUseListFalse() throws Exception {
+        String data = consumer.receiveBody(getFtpUrl() + "&fileName=report.txt", 5000, String.class);
+        assertEquals("Hello World from FTPServer", data);
+
+        // try a 2nd time and the file is deleted
+        data = consumer.receiveBody(getFtpUrl() + "&fileName=report.txt", 1000, String.class);
+        assertNull("The file should no longer exist", data);
+
+        // and try a non existing file name
+        data = consumer.receiveBody(getFtpUrl() + "&fileName=report2.txt", 1000, String.class);
+        assertNull("The file should no longer exist", data);
+    }
+    
+    private void prepareFtpServer() throws Exception {
+        // prepares the FTP Server by creating a file on the server that we want to unit
+        // test that we can pool and store as a local file
+        Endpoint endpoint = context.getEndpoint("ftp://admin@localhost:" + getPort() + "/nolist/?password=admin&binary=false");
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody("Hello World from FTPServer");
+        exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt");
+        Producer producer = endpoint.createProducer();
+        producer.start();
+        producer.process(exchange);
+        producer.stop();
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+}


[2/2] git commit: CAMEL-6671: FTP consumer. Added useList and ignoreFileNotFound options to allow to download a single file without using FTP LIST command which is needed in some use-cases, such as user has no permission to do FTP LIST.

Posted by da...@apache.org.
CAMEL-6671: FTP consumer. Added useList and ignoreFileNotFound options to allow to download a single file without using FTP LIST command which is needed in some use-cases, such as user has no permission to do FTP LIST.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c948c2b8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c948c2b8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c948c2b8

Branch: refs/heads/camel-2.12.x
Commit: c948c2b8d237c1b191ede44491160b12478c027b
Parents: b56b1c5
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Sep 5 12:44:16 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Sep 5 13:00:43 2013 +0200

----------------------------------------------------------------------
 .../component/file/GenericFileConsumer.java     | 64 +++++++++++++----
 .../camel/impl/ScheduledPollConsumer.java       |  1 +
 .../component/file/remote/FtpConsumer.java      | 40 +++++++++--
 .../file/remote/RemoteFileConfiguration.java    | 35 ++++++++-
 .../file/remote/RemoteFileConsumer.java         |  4 +-
 .../component/file/remote/SftpConsumer.java     |  1 +
 .../file/remote/FromFtpUseListFalseTest.java    | 75 ++++++++++++++++++++
 .../FtpConsumerTemplateUseListFalseTest.java    | 74 +++++++++++++++++++
 8 files changed, 271 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 0f8be8e..c55755e 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -171,6 +171,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
 
     public int processBatch(Queue<Object> exchanges) {
         int total = exchanges.size();
+        int answer = total;
 
         // limit if needed
         if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
@@ -191,19 +192,25 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
             pendingExchanges = total - index - 1;
 
             // process the current exchange
+            boolean started;
             if (customProcessor != null) {
                 // use a custom processor
-                customProcessExchange(exchange, customProcessor);
+                started = customProcessExchange(exchange, customProcessor);
             } else {
                 // process the exchange regular
-                processExchange(exchange);
+                started = processExchange(exchange);
+            }
+
+            // if we did not start process the file then decremember the counter
+            if (!started) {
+                answer--;
             }
         }
 
         // drain any in progress files as we are done with this batch
         removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0);
 
-        return total;
+        return answer;
     }
 
     protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
@@ -277,11 +284,28 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
     }
 
     /**
+     * Whether to ignore if the file cannot be retrieved.
+     * <p/>
+     * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved.
+     * <p/>
+     * This method allows to suppress this and just ignore that.
+     *
+     * @param name        the file name
+     * @param exchange    the exchange
+     * @return <tt>true</tt> to ignore, <tt>false</tt> is the default.
+     */
+    protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange) {
+        return false;
+    }
+
+    /**
      * Processes the exchange
      *
      * @param exchange the exchange
+     * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started
+     * to be processed, for some reason (not found, or aborted etc)
      */
-    protected void processExchange(final Exchange exchange) {
+    protected boolean processExchange(final Exchange exchange) {
         GenericFile<T> file = getExchangeFileProperty(exchange);
         log.trace("Processing file: {}", file);
 
@@ -303,7 +327,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
                     // begin returned false, so remove file from the in progress list as its no longer in progress
                     endpoint.getInProgressRepository().remove(absoluteFileName);
                 }
-                return;
+                return false;
             }
         } catch (Exception e) {
             // remove file from the in progress list due to failure
@@ -311,7 +335,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
 
             String msg = endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage();
             handleException(msg, e);
-            return;
+            return false;
         }
 
         // must use file from exchange as it can be updated due the
@@ -328,10 +352,17 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
                 // retrieve the file and check it was a success
                 boolean retrieved = operations.retrieveFile(name, exchange);
                 if (!retrieved) {
-                    // throw exception to handle the problem with retrieving the file
-                    // then if the method return false or throws an exception is handled the same in here
-                    // as in both cases an exception is being thrown
-                    throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
+                    if (ignoreCannotRetrieveFile(name, exchange)) {
+                        log.trace("Cannot retrieve file {} maybe it does not exists. Ignorning.", name);
+                        // remove file from the in progress list as we could not retrieve it, but should ignore
+                        endpoint.getInProgressRepository().remove(absoluteFileName);
+                        return false;
+                    } else {
+                        // throw exception to handle the problem with retrieving the file
+                        // then if the method return false or throws an exception is handled the same in here
+                        // as in both cases an exception is being thrown
+                        throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
+                    }
                 }
     
                 log.trace("Retrieved file: {} from: {}", name, endpoint);                
@@ -368,6 +399,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
             String msg = "Error processing file " + file + " due to " + e.getMessage();
             handleException(msg, e);
         }
+
+        return true;
     }
 
     /**
@@ -385,7 +418,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
      * @param exchange the exchange
      * @param processor the custom processor
      */
-    protected void customProcessExchange(final Exchange exchange, final Processor processor) {
+    protected boolean customProcessExchange(final Exchange exchange, final Processor processor) {
         GenericFile<T> file = getExchangeFileProperty(exchange);
         log.trace("Custom processing file: {}", file);
 
@@ -407,6 +440,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
             // as the name can be different when using preMove option
             endpoint.getInProgressRepository().remove(absoluteFileName);
         }
+
+        return true;
     }
 
     /**
@@ -508,7 +543,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
 
         // use file expression for a simple dynamic file filter
         if (endpoint.getFileName() != null) {
-            evaluateFileExpression();
+            fileExpressionResult = evaluateFileExpression();
             if (fileExpressionResult != null) {
                 if (!name.equals(fileExpressionResult)) {
                     return false;
@@ -557,12 +592,13 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
         return !endpoint.getInProgressRepository().add(key);
     }
 
-    private void evaluateFileExpression() {
-        if (fileExpressionResult == null) {
+    protected String evaluateFileExpression() {
+        if (fileExpressionResult == null && endpoint.getFileName() != null) {
             // create a dummy exchange as Exchange is needed for expression evaluation
             Exchange dummy = endpoint.createExchange();
             fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
         }
+        return fileExpressionResult;
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
index 6b34ac7..98b9e42 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
@@ -185,6 +185,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
                         if (begin) {
                             retryCounter++;
                             polledMessages = poll();
+                            LOG.trace("Polled {} messages", polledMessages);
 
                             if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
                                 // send an "empty" exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
index 7b042ea..5d7dca5 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
@@ -16,8 +16,10 @@
  */
 package org.apache.camel.component.file.remote;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.util.FileUtil;
@@ -83,11 +85,24 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
         }
 
         log.trace("Polling directory: {}", dir);
-        List<FTPFile> files;
-        if (isStepwise()) {
-            files = operations.listFiles();
+        List<FTPFile> files = null;
+        if (isUseList()) {
+            if (isStepwise()) {
+                files = operations.listFiles();
+            } else {
+                files = operations.listFiles(dir);
+            }
         } else {
-            files = operations.listFiles(dir);
+            // we cannot use the LIST command(s) so we can only poll a named file
+            // so created a pseudo file with that name
+            FTPFile file = new FTPFile();
+            file.setType(FTPFile.FILE_TYPE);
+            fileExpressionResult = evaluateFileExpression();
+            if (fileExpressionResult != null) {
+                file.setName(fileExpressionResult);
+                files = new ArrayList<FTPFile>(1);
+                files.add(file);
+            }
         }
 
         if (files == null || files.isEmpty()) {
@@ -149,6 +164,18 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
         return false;
     }
 
+    @Override
+    protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange) {
+        if (getEndpoint().getConfiguration().isIgnoreFileNotFound()) {
+            // error code 550 is file not found
+            int code = exchange.getIn().getHeader(FtpConstants.FTP_REPLY_CODE, 0, int.class);
+            if (code == 550) {
+                return true;
+            }
+        }
+        return super.ignoreCannotRetrieveFile(name, exchange);
+    }
+
     private RemoteFile<FTPFile> asRemoteFile(String absolutePath, FTPFile file) {
         RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>();
 
@@ -193,6 +220,11 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
         return config.isStepwise();
     }
 
+    private boolean isUseList() {
+        RemoteFileConfiguration config = (RemoteFileConfiguration) endpoint.getConfiguration();
+        return config.isUseList();
+    }
+
     @Override
     public String toString() {
         return "FtpConsumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";

http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
index e5325ba..bbe0a18 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
@@ -33,7 +33,7 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
      * Windows = Path separator \ is used
      * Auto = Use existing path separator in file name
      */
-    public enum PathSeparator { UNIX, Windows, Auto };
+    public enum PathSeparator { UNIX, Windows, Auto }
 
     private String protocol;
     private String username;
@@ -50,6 +50,8 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
     private boolean stepwise = true;
     private PathSeparator separator = PathSeparator.Auto;
     private boolean streamDownload;
+    private boolean useList = true;
+    private boolean ignoreFileNotFound;
 
     public RemoteFileConfiguration() {
     }
@@ -270,13 +272,40 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
      * Sets the download method to use when not using a local working directory.  If set to true,
      * the remote files are streamed to the route as they are read.  When set to false, the remote files
      * are loaded into memory before being sent into the route.
-     *
-     * @param streamDownload 
      */
     public void setStreamDownload(boolean streamDownload) {
         this.streamDownload = streamDownload;
     }
 
+    public boolean isUseList() {
+        return useList;
+    }
+
+    /**
+     * Whether to allow using LIST command when downloading a file.
+     * <p/>
+     * Default is <tt>true</tt>. In some use cases you may want to download
+     * a specific file and are not allowed to use the LIST command, and therefore
+     * you can set this option to <tt>false</tt>.
+     */
+    public void setUseList(boolean useList) {
+        this.useList = useList;
+    }
+
+    public boolean isIgnoreFileNotFound() {
+        return ignoreFileNotFound;
+    }
+
+    /**
+     * Whether to ignore when trying to download a file which does not exist.
+     * <p/>
+     * By default when a file does not exists, then an exception is thrown.
+     * Setting this option to <tt>true</tt> allows to ignore that instead.
+     */
+    public void setIgnoreFileNotFound(boolean ignoreFileNotFound) {
+        this.ignoreFileNotFound = ignoreFileNotFound;
+    }
+
     /**
      * Normalizes the given path according to the configured path separator.
      *

http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index 39e23ce..de242da 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -92,11 +92,11 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
     }
 
     @Override
-    protected void processExchange(Exchange exchange) {
+    protected boolean processExchange(Exchange exchange) {
         // mark the exchange to be processed synchronously as the ftp client is not thread safe
         // and we must execute the callbacks in the same thread as this consumer
         exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE);
-        super.processExchange(exchange);
+        return super.processExchange(exchange);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
index 1dbfd9d..765aac4 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.file.remote;
 import java.util.List;
 
 import com.jcraft.jsch.ChannelSftp;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.util.FileUtil;

http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
new file mode 100644
index 0000000..afd2763
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test to poll a fixed file from the FTP server without using the list command.
+ */
+public class FromFtpUseListFalseTest extends FtpServerTestSupport {
+
+    private String getFtpUrl() {
+        return "ftp://admin@localhost:" + getPort() + "/nolist/?password=admin"
+                + "&stepwise=false&useList=false&ignoreFileNotFound=true&fileName=report.txt&delete=true";
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        prepareFtpServer();
+    }
+
+    @Test
+    public void testUseListFalse() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World from FTPServer");
+
+        // just allow to poll a few more times, but we should only get the file once
+        Thread.sleep(1000);
+
+        mock.assertIsSatisfied();
+    }
+    
+    private void prepareFtpServer() throws Exception {
+        // prepares the FTP Server by creating a file on the server that we want to unit
+        // test that we can pool and store as a local file
+        Endpoint endpoint = context.getEndpoint("ftp://admin@localhost:" + getPort() + "/nolist/?password=admin&binary=false");
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody("Hello World from FTPServer");
+        exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt");
+        Producer producer = endpoint.createProducer();
+        producer.start();
+        producer.process(exchange);
+        producer.stop();
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(getFtpUrl()).to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
new file mode 100644
index 0000000..6554e62
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test to poll a fixed file from the FTP server without using the list command.
+ */
+public class FtpConsumerTemplateUseListFalseTest extends FtpServerTestSupport {
+
+    private String getFtpUrl() {
+        return "ftp://admin@localhost:" + getPort() + "/nolist/?password=admin"
+                + "&stepwise=false&useList=false&ignoreFileNotFound=true&delete=true";
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        prepareFtpServer();
+    }
+
+    @Test
+    public void testUseListFalse() throws Exception {
+        String data = consumer.receiveBody(getFtpUrl() + "&fileName=report.txt", 5000, String.class);
+        assertEquals("Hello World from FTPServer", data);
+
+        // try a 2nd time and the file is deleted
+        data = consumer.receiveBody(getFtpUrl() + "&fileName=report.txt", 1000, String.class);
+        assertNull("The file should no longer exist", data);
+
+        // and try a non existing file name
+        data = consumer.receiveBody(getFtpUrl() + "&fileName=report2.txt", 1000, String.class);
+        assertNull("The file should no longer exist", data);
+    }
+    
+    private void prepareFtpServer() throws Exception {
+        // prepares the FTP Server by creating a file on the server that we want to unit
+        // test that we can pool and store as a local file
+        Endpoint endpoint = context.getEndpoint("ftp://admin@localhost:" + getPort() + "/nolist/?password=admin&binary=false");
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody("Hello World from FTPServer");
+        exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt");
+        Producer producer = endpoint.createProducer();
+        producer.start();
+        producer.process(exchange);
+        producer.stop();
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+}