You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/08/16 18:00:47 UTC

svn commit: r1158341 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/test/java/org/apache/camel/component/file/ components/camel-ftp/src/test/java/org/apache/ca...

Author: davsclaus
Date: Tue Aug 16 16:00:46 2011
New Revision: 1158341

URL: http://svn.apache.org/viewvc?rev=1158341&view=rev
Log:
CAMEL-4281: File/ftp endpoints is now BrowsableEndpoint which allows to browse the file system. It leverages the consumer logic, but without starting the scheduler.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java
      - copied, changed from r1158296, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1158341&r1=1158340&r2=1158341&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Tue Aug 16 16:00:46 2011
@@ -38,7 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Base class for remote file consumers.
+ * Base class for file consumers.
  */
 public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
     protected final transient Logger log = LoggerFactory.getLogger(getClass());
@@ -49,6 +49,7 @@ public abstract class GenericFileConsume
     protected int maxMessagesPerPoll;
     protected volatile ShutdownRunningTask shutdownRunningTask;
     protected volatile int pendingExchanges;
+    protected Processor customProcessor;
 
     public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
         super(endpoint, processor);
@@ -56,6 +57,25 @@ public abstract class GenericFileConsume
         this.operations = operations;
     }
 
+    public Processor getCustomProcessor() {
+        return customProcessor;
+    }
+
+    /**
+     * Use a custom processor to process the exchange.
+     * <p/>
+     * Only set this if you need to do custom processing, instead of the regular processing.
+     * <p/>
+     * This is for example used to browse file endpoints by leveraging the file consumer to poll
+     * the directory to gather the list of exchanges. But to avoid processing the files regularly
+     * we can use a custom processor.
+     *
+     * @param processor a custom processor
+     */
+    public void setCustomProcessor(Processor processor) {
+        this.customProcessor = processor;
+    }
+
     /**
      * Poll for files
      */
@@ -148,7 +168,13 @@ public abstract class GenericFileConsume
             pendingExchanges = total - index - 1;
 
             // process the current exchange
-            processExchange(exchange);
+            if (customProcessor != null) {
+                // use a custom processor
+                customProcessExchange(exchange, customProcessor);
+            } else {
+                // process the exchange regular
+                processExchange(exchange);
+            }
         }
 
         // remove the file from the in progress list in case the batch was limited by max messages per poll
@@ -332,6 +358,35 @@ public abstract class GenericFileConsume
     }
 
     /**
+     * Processes the exchange using a custom processor.
+     *
+     * @param exchange the exchange
+     * @param processor the custom processor
+     */
+    protected void customProcessExchange(final Exchange exchange, final Processor processor) {
+        GenericFile<T> file = getExchangeFileProperty(exchange);
+        log.trace("Custom processing file: {}", file);
+
+        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
+        // and then the file name would be changed
+        String absoluteFileName = file.getAbsoluteFilePath();
+
+        try {
+            // process using the custom processor
+            processor.process(exchange);
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e);
+            }
+        } finally {
+            // always remove file from the in progress list as its no longer in progress
+            // use the original file name that was used to add it to the repository
+            // as the name can be different when using preMove option
+            endpoint.getInProgressRepository().remove(absoluteFileName);
+        }
+    }
+
+    /**
      * Strategy for validating if the given remote file should be included or not
      *
      * @param file        the file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1158341&r1=1158340&r2=1158341&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Tue Aug 16 16:00:46 2011
@@ -19,8 +19,10 @@ package org.apache.camel.component.file;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
@@ -33,6 +35,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.converter.IOConverter;
 import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.BrowsableEndpoint;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.spi.Language;
@@ -44,9 +47,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Generic FileEndpoint
+ * Base class for file endpoints
  */
-public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint {
+public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint implements BrowsableEndpoint {
 
     protected static final transient String DEFAULT_STRATEGYFACTORY_CLASS = "org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory";
     protected static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
@@ -132,6 +135,44 @@ public abstract class GenericFileEndpoin
     }
 
     /**
+     * This implementation will <b>not</b> load the file content.
+     * Any file locking is neither in use by this implementation..
+     */
+    @Override
+    public List<Exchange> getExchanges() {
+        final List<Exchange> answer = new ArrayList<Exchange>();
+
+        GenericFileConsumer consumer = null;
+        try {
+            // create a new consumer which can poll the exchanges we want to browse
+            // do not provide a processor as we do some custom processing
+            consumer = createConsumer(null);
+            consumer.setCustomProcessor(new Processor() {
+                @Override
+                public void process(Exchange exchange) throws Exception {
+                    answer.add(exchange);
+                }
+            });
+            // do not start scheduler, as we invoke the poll manually
+            consumer.setStartScheduler(false);
+            // start consumer
+            ServiceHelper.startService(consumer);
+            // invoke poll which performs the custom processing, so we can browse the exchanges
+            consumer.poll();
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            try {
+                ServiceHelper.stopService(consumer);
+            } catch (Exception e) {
+                log.debug("Error stopping consumer used for browsing exchanges. This exception will be ignored", e);
+            }
+        }
+
+        return answer;
+    }
+
+    /**
      * A strategy method to lazily create the file strategy
      */
     @SuppressWarnings("unchecked")

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1158341&r1=1158340&r2=1158341&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Aug 16 16:00:46 2011
@@ -43,6 +43,7 @@ public abstract class ScheduledPollConsu
     private ScheduledFuture<?> future;
 
     // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties
+    private boolean startScheduler = true;
     private long initialDelay = 1000;
     private long delay = 500;
     private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@@ -227,6 +228,21 @@ public abstract class ScheduledPollConsu
         this.pollStrategy = pollStrategy;
     }
 
+    public boolean isStartScheduler() {
+        return startScheduler;
+    }
+
+    /**
+     * Sets whether the scheduler should be started when this consumer starts.
+     * <p/>
+     * This option is default true.
+     *
+     * @param startScheduler whether to start scheduler
+     */
+    public void setStartScheduler(boolean startScheduler) {
+        this.startScheduler = startScheduler;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
 
@@ -244,6 +260,12 @@ public abstract class ScheduledPollConsu
         ObjectHelper.notNull(executor, "executor", this);
         ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
 
+        if (isStartScheduler()) {
+            startScheduler();
+        }
+    }
+
+    protected void startScheduler() {
         if (isUseFixedDelay()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}",

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1158341&r1=1158340&r2=1158341&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java Tue Aug 16 16:00:46 2011
@@ -86,19 +86,24 @@ public abstract class ScheduledPollEndpo
     private void configureScheduledPollConsumerProperties(Map<String, Object> options, Map<String, Object> consumerProperties) {
         // special for scheduled poll consumers as we want to allow end users to configure its options
         // from the URI parameters without the consumer. prefix
+        Object startScheduler = options.remove("startScheduler");
         Object initialDelay = options.remove("initialDelay");
         Object delay = options.remove("delay");
         Object timeUnit = options.remove("timeUnit");
         Object useFixedDelay = options.remove("useFixedDelay");
         Object pollStrategy = options.remove("pollStrategy");
         Object runLoggingLevel = options.remove("runLoggingLevel");
-        if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null || runLoggingLevel != null) {
+        if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null
+                || runLoggingLevel != null || startScheduler != null) {
             if (consumerProperties == null) {
                 consumerProperties = new HashMap<String, Object>();
             }
             if (initialDelay != null) {
                 consumerProperties.put("initialDelay", initialDelay);
             }
+            if (startScheduler != null) {
+                consumerProperties.put("startScheduler", startScheduler);
+            }
             if (delay != null) {
                 consumerProperties.put("delay", delay);
             }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java?rev=1158341&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java Tue Aug 16 16:00:46 2011
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.BrowsableEndpoint;
+
+/**
+ *
+ */
+public class FileBrowsableEndpointTest extends ContextTestSupport {
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/browse");
+        super.setUp();
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testBrowsableNoFiles() throws Exception {
+        BrowsableEndpoint browse = context.getEndpoint("file:target/browse", BrowsableEndpoint.class);
+        assertNotNull(browse);
+
+        List<Exchange> list = browse.getExchanges();
+        assertNotNull(list);
+        assertEquals(0, list.size());
+    }
+
+    public void testBrowsableOneFile() throws Exception {
+        template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt");
+
+        FileEndpoint endpoint = context.getEndpoint("file:target/browse", FileEndpoint.class);
+        assertNotNull(endpoint);
+
+        MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+        assertEquals(0, repo.getCacheSize());
+
+        List<Exchange> list = endpoint.getExchanges();
+        assertNotNull(list);
+        assertEquals(1, list.size());
+
+        assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+
+        // the in progress repo should not leak
+        assertEquals(0, repo.getCacheSize());
+
+        // and the file is still there
+        File file = new File("target/browse/a.txt");
+        assertTrue("File should exist " + file, file.exists());
+    }
+
+    public void testBrowsableTwoFiles() throws Exception {
+        template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt");
+        template.sendBodyAndHeader("file:target/browse", "B", Exchange.FILE_NAME, "b.txt");
+
+        FileEndpoint endpoint = context.getEndpoint("file:target/browse?sortBy=file:name", FileEndpoint.class);
+        assertNotNull(endpoint);
+
+        MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+        assertEquals(0, repo.getCacheSize());
+
+        List<Exchange> list = endpoint.getExchanges();
+        assertNotNull(list);
+        assertEquals(2, list.size());
+
+        assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+        assertEquals("b.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME));
+
+        // the in progress repo should not leak
+        assertEquals(0, repo.getCacheSize());
+
+        // and the files is still there
+        File fileA = new File("target/browse/a.txt");
+        assertTrue("File should exist " + fileA, fileA.exists());
+        File fileB = new File("target/browse/b.txt");
+        assertTrue("File should exist " + fileB, fileB.exists());
+    }
+
+    public void testBrowsableThreeFilesRecursive() throws Exception {
+        template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt");
+        template.sendBodyAndHeader("file:target/browse", "B", Exchange.FILE_NAME, "foo/b.txt");
+        template.sendBodyAndHeader("file:target/browse", "C", Exchange.FILE_NAME, "bar/c.txt");
+
+        FileEndpoint endpoint = context.getEndpoint("file:target/browse?recursive=true&sortBy=file:name", FileEndpoint.class);
+        assertNotNull(endpoint);
+
+        MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+        assertEquals(0, repo.getCacheSize());
+
+        List<Exchange> list = endpoint.getExchanges();
+        assertNotNull(list);
+        assertEquals(3, list.size());
+
+        assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+        assertEquals("c.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME_ONLY));
+        assertEquals("b.txt", list.get(2).getIn().getHeader(Exchange.FILE_NAME_ONLY));
+
+        // the in progress repo should not leak
+        assertEquals(0, repo.getCacheSize());
+
+        // and the files is still there
+        File fileA = new File("target/browse/a.txt");
+        assertTrue("File should exist " + fileA, fileA.exists());
+        File fileB = new File("target/browse/foo/b.txt");
+        assertTrue("File should exist " + fileB, fileB.exists());
+        File fileC = new File("target/browse/bar/c.txt");
+        assertTrue("File should exist " + fileC, fileC.exists());
+    }
+}

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java (from r1158296, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java&r1=1158296&r2=1158341&rev=1158341&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java Tue Aug 16 16:00:46 2011
@@ -16,35 +16,123 @@
  */
 package org.apache.camel.component.file.remote;
 
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
+import java.io.File;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.BrowsableEndpoint;
 import org.junit.Test;
 
 /**
  * @version 
  */
-public class FromFtpToMockTest extends FtpServerTestSupport {
-    protected String expectedBody = "Hello there!";
+public class FtpBrowsableEndpointTest extends FtpServerTestSupport {
 
     private String getFtpUrl() {
-        return "ftp://admin@localhost:" + getPort() + "/tmp/camel?password=admin&recursive=true";
+        return "ftp://admin@localhost:" + getPort() + "/browse?password=admin";
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testBrowsableNoFiles() throws Exception {
+        // make sure starting directory exists
+        createDirectory(FTP_ROOT_DIR + "browse");
+
+        BrowsableEndpoint browse = context.getEndpoint(getFtpUrl(), BrowsableEndpoint.class);
+        assertNotNull(browse);
+
+        List<Exchange> list = browse.getExchanges();
+        assertNotNull(list);
+        assertEquals(0, list.size());
     }
 
     @Test
-    public void testFtpRoute() throws Exception {
-        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
-        resultEndpoint.expectedBodiesReceived(expectedBody);
+    public void testBrowsableOneFile() throws Exception {
+        template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt");
+
+        FtpEndpoint endpoint = context.getEndpoint(getFtpUrl(), FtpEndpoint.class);
+        assertNotNull(endpoint);
+
+        MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+        assertEquals(0, repo.getCacheSize());
 
-        template.sendBodyAndHeader(getFtpUrl(), expectedBody, "cheese", 123);
+        List<Exchange> list = endpoint.getExchanges();
+        assertNotNull(list);
+        assertEquals(1, list.size());
 
-        resultEndpoint.assertIsSatisfied();
+        assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+
+        // the in progress repo should not leak
+        assertEquals(0, repo.getCacheSize());
+
+        // and the file is still there
+        File file = new File(FTP_ROOT_DIR + "browse/a.txt");
+        assertTrue("File should exist " + file, file.exists());
     }
 
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            public void configure() throws Exception {
-                from(getFtpUrl()).to("mock:result");
-            }
-        };
+    @Test
+    public void testBrowsableTwoFiles() throws Exception {
+        template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt");
+        template.sendBodyAndHeader(getFtpUrl(), "B", Exchange.FILE_NAME, "b.txt");
+
+        FtpEndpoint endpoint = context.getEndpoint(getFtpUrl() + "&sortBy=file:name", FtpEndpoint.class);
+        assertNotNull(endpoint);
+
+        MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+        assertEquals(0, repo.getCacheSize());
+
+        List<Exchange> list = endpoint.getExchanges();
+        assertNotNull(list);
+        assertEquals(2, list.size());
+
+        assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+        assertEquals("b.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME));
+
+        // the in progress repo should not leak
+        assertEquals(0, repo.getCacheSize());
+
+        // and the files is still there
+        File fileA = new File(FTP_ROOT_DIR + "browse/a.txt");
+        assertTrue("File should exist " + fileA, fileA.exists());
+        File fileB = new File(FTP_ROOT_DIR + "browse/b.txt");
+        assertTrue("File should exist " + fileB, fileB.exists());
     }
+
+    @Test
+    public void testBrowsableThreeFilesRecursive() throws Exception {
+        template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt");
+        template.sendBodyAndHeader(getFtpUrl(), "B", Exchange.FILE_NAME, "foo/b.txt");
+        template.sendBodyAndHeader(getFtpUrl(), "C", Exchange.FILE_NAME, "bar/c.txt");
+
+        FtpEndpoint endpoint = context.getEndpoint(getFtpUrl() + "&recursive=true&sortBy=file:name", FtpEndpoint.class);
+        assertNotNull(endpoint);
+
+        MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+        assertEquals(0, repo.getCacheSize());
+
+        List<Exchange> list = endpoint.getExchanges();
+        assertNotNull(list);
+        assertEquals(3, list.size());
+
+        assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+        assertEquals("c.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME_ONLY));
+        assertEquals("b.txt", list.get(2).getIn().getHeader(Exchange.FILE_NAME_ONLY));
+
+        // the in progress repo should not leak
+        assertEquals(0, repo.getCacheSize());
+
+        // and the files is still there
+        File fileA = new File(FTP_ROOT_DIR + "browse/a.txt");
+        assertTrue("File should exist " + fileA, fileA.exists());
+        File fileB = new File(FTP_ROOT_DIR + "browse/foo/b.txt");
+        assertTrue("File should exist " + fileB, fileB.exists());
+        File fileC = new File(FTP_ROOT_DIR + "browse/bar/c.txt");
+        assertTrue("File should exist " + fileC, fileC.exists());
+    }
+
 }
\ No newline at end of file