You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/08/16 18:00:47 UTC
svn commit: r1158341 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/test/java/org/apache/camel/component/file/
components/camel-ftp/src/test/java/org/apache/ca...
Author: davsclaus
Date: Tue Aug 16 16:00:46 2011
New Revision: 1158341
URL: http://svn.apache.org/viewvc?rev=1158341&view=rev
Log:
CAMEL-4281: File/ftp endpoints is now BrowsableEndpoint which allows to browse the file system. It leverages the consumer logic, but without starting the scheduler.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java
- copied, changed from r1158296, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1158341&r1=1158340&r2=1158341&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Tue Aug 16 16:00:46 2011
@@ -38,7 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Base class for remote file consumers.
+ * Base class for file consumers.
*/
public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
protected final transient Logger log = LoggerFactory.getLogger(getClass());
@@ -49,6 +49,7 @@ public abstract class GenericFileConsume
protected int maxMessagesPerPoll;
protected volatile ShutdownRunningTask shutdownRunningTask;
protected volatile int pendingExchanges;
+ protected Processor customProcessor;
public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
super(endpoint, processor);
@@ -56,6 +57,25 @@ public abstract class GenericFileConsume
this.operations = operations;
}
+ public Processor getCustomProcessor() {
+ return customProcessor;
+ }
+
+ /**
+ * Use a custom processor to process the exchange.
+ * <p/>
+ * Only set this if you need to do custom processing, instead of the regular processing.
+ * <p/>
+ * This is for example used to browse file endpoints by leveraging the file consumer to poll
+ * the directory to gather the list of exchanges. But to avoid processing the files regularly
+ * we can use a custom processor.
+ *
+ * @param processor a custom processor
+ */
+ public void setCustomProcessor(Processor processor) {
+ this.customProcessor = processor;
+ }
+
/**
* Poll for files
*/
@@ -148,7 +168,13 @@ public abstract class GenericFileConsume
pendingExchanges = total - index - 1;
// process the current exchange
- processExchange(exchange);
+ if (customProcessor != null) {
+ // use a custom processor
+ customProcessExchange(exchange, customProcessor);
+ } else {
+ // process the exchange regular
+ processExchange(exchange);
+ }
}
// remove the file from the in progress list in case the batch was limited by max messages per poll
@@ -332,6 +358,35 @@ public abstract class GenericFileConsume
}
/**
+ * Processes the exchange using a custom processor.
+ *
+ * @param exchange the exchange
+ * @param processor the custom processor
+ */
+ protected void customProcessExchange(final Exchange exchange, final Processor processor) {
+ GenericFile<T> file = getExchangeFileProperty(exchange);
+ log.trace("Custom processing file: {}", file);
+
+ // must extract the absolute name before the begin strategy as the file could potentially be pre moved
+ // and then the file name would be changed
+ String absoluteFileName = file.getAbsoluteFilePath();
+
+ try {
+ // process using the custom processor
+ processor.process(exchange);
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e);
+ }
+ } finally {
+ // always remove file from the in progress list as its no longer in progress
+ // use the original file name that was used to add it to the repository
+ // as the name can be different when using preMove option
+ endpoint.getInProgressRepository().remove(absoluteFileName);
+ }
+ }
+
+ /**
* Strategy for validating if the given remote file should be included or not
*
* @param file the file
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1158341&r1=1158340&r2=1158341&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Tue Aug 16 16:00:46 2011
@@ -19,8 +19,10 @@ package org.apache.camel.component.file;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.camel.CamelContext;
@@ -33,6 +35,7 @@ import org.apache.camel.Processor;
import org.apache.camel.converter.IOConverter;
import org.apache.camel.impl.ScheduledPollEndpoint;
import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.BrowsableEndpoint;
import org.apache.camel.spi.FactoryFinder;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.spi.Language;
@@ -44,9 +47,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Generic FileEndpoint
+ * Base class for file endpoints
*/
-public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint {
+public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint implements BrowsableEndpoint {
protected static final transient String DEFAULT_STRATEGYFACTORY_CLASS = "org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory";
protected static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
@@ -132,6 +135,44 @@ public abstract class GenericFileEndpoin
}
/**
+ * This implementation will <b>not</b> load the file content.
+ * Any file locking is neither in use by this implementation..
+ */
+ @Override
+ public List<Exchange> getExchanges() {
+ final List<Exchange> answer = new ArrayList<Exchange>();
+
+ GenericFileConsumer consumer = null;
+ try {
+ // create a new consumer which can poll the exchanges we want to browse
+ // do not provide a processor as we do some custom processing
+ consumer = createConsumer(null);
+ consumer.setCustomProcessor(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ answer.add(exchange);
+ }
+ });
+ // do not start scheduler, as we invoke the poll manually
+ consumer.setStartScheduler(false);
+ // start consumer
+ ServiceHelper.startService(consumer);
+ // invoke poll which performs the custom processing, so we can browse the exchanges
+ consumer.poll();
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ try {
+ ServiceHelper.stopService(consumer);
+ } catch (Exception e) {
+ log.debug("Error stopping consumer used for browsing exchanges. This exception will be ignored", e);
+ }
+ }
+
+ return answer;
+ }
+
+ /**
* A strategy method to lazily create the file strategy
*/
@SuppressWarnings("unchecked")
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1158341&r1=1158340&r2=1158341&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Aug 16 16:00:46 2011
@@ -43,6 +43,7 @@ public abstract class ScheduledPollConsu
private ScheduledFuture<?> future;
// if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties
+ private boolean startScheduler = true;
private long initialDelay = 1000;
private long delay = 500;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@@ -227,6 +228,21 @@ public abstract class ScheduledPollConsu
this.pollStrategy = pollStrategy;
}
+ public boolean isStartScheduler() {
+ return startScheduler;
+ }
+
+ /**
+ * Sets whether the scheduler should be started when this consumer starts.
+ * <p/>
+ * This option is default true.
+ *
+ * @param startScheduler whether to start scheduler
+ */
+ public void setStartScheduler(boolean startScheduler) {
+ this.startScheduler = startScheduler;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
@@ -244,6 +260,12 @@ public abstract class ScheduledPollConsu
ObjectHelper.notNull(executor, "executor", this);
ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
+ if (isStartScheduler()) {
+ startScheduler();
+ }
+ }
+
+ protected void startScheduler() {
if (isUseFixedDelay()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}",
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1158341&r1=1158340&r2=1158341&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java Tue Aug 16 16:00:46 2011
@@ -86,19 +86,24 @@ public abstract class ScheduledPollEndpo
private void configureScheduledPollConsumerProperties(Map<String, Object> options, Map<String, Object> consumerProperties) {
// special for scheduled poll consumers as we want to allow end users to configure its options
// from the URI parameters without the consumer. prefix
+ Object startScheduler = options.remove("startScheduler");
Object initialDelay = options.remove("initialDelay");
Object delay = options.remove("delay");
Object timeUnit = options.remove("timeUnit");
Object useFixedDelay = options.remove("useFixedDelay");
Object pollStrategy = options.remove("pollStrategy");
Object runLoggingLevel = options.remove("runLoggingLevel");
- if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null || runLoggingLevel != null) {
+ if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null
+ || runLoggingLevel != null || startScheduler != null) {
if (consumerProperties == null) {
consumerProperties = new HashMap<String, Object>();
}
if (initialDelay != null) {
consumerProperties.put("initialDelay", initialDelay);
}
+ if (startScheduler != null) {
+ consumerProperties.put("startScheduler", startScheduler);
+ }
if (delay != null) {
consumerProperties.put("delay", delay);
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java?rev=1158341&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBrowsableEndpointTest.java Tue Aug 16 16:00:46 2011
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.BrowsableEndpoint;
+
+/**
+ *
+ */
+public class FileBrowsableEndpointTest extends ContextTestSupport {
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/browse");
+ super.setUp();
+ }
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testBrowsableNoFiles() throws Exception {
+ BrowsableEndpoint browse = context.getEndpoint("file:target/browse", BrowsableEndpoint.class);
+ assertNotNull(browse);
+
+ List<Exchange> list = browse.getExchanges();
+ assertNotNull(list);
+ assertEquals(0, list.size());
+ }
+
+ public void testBrowsableOneFile() throws Exception {
+ template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt");
+
+ FileEndpoint endpoint = context.getEndpoint("file:target/browse", FileEndpoint.class);
+ assertNotNull(endpoint);
+
+ MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+ assertEquals(0, repo.getCacheSize());
+
+ List<Exchange> list = endpoint.getExchanges();
+ assertNotNull(list);
+ assertEquals(1, list.size());
+
+ assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+
+ // the in progress repo should not leak
+ assertEquals(0, repo.getCacheSize());
+
+ // and the file is still there
+ File file = new File("target/browse/a.txt");
+ assertTrue("File should exist " + file, file.exists());
+ }
+
+ public void testBrowsableTwoFiles() throws Exception {
+ template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt");
+ template.sendBodyAndHeader("file:target/browse", "B", Exchange.FILE_NAME, "b.txt");
+
+ FileEndpoint endpoint = context.getEndpoint("file:target/browse?sortBy=file:name", FileEndpoint.class);
+ assertNotNull(endpoint);
+
+ MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+ assertEquals(0, repo.getCacheSize());
+
+ List<Exchange> list = endpoint.getExchanges();
+ assertNotNull(list);
+ assertEquals(2, list.size());
+
+ assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+ assertEquals("b.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME));
+
+ // the in progress repo should not leak
+ assertEquals(0, repo.getCacheSize());
+
+ // and the files is still there
+ File fileA = new File("target/browse/a.txt");
+ assertTrue("File should exist " + fileA, fileA.exists());
+ File fileB = new File("target/browse/b.txt");
+ assertTrue("File should exist " + fileB, fileB.exists());
+ }
+
+ public void testBrowsableThreeFilesRecursive() throws Exception {
+ template.sendBodyAndHeader("file:target/browse", "A", Exchange.FILE_NAME, "a.txt");
+ template.sendBodyAndHeader("file:target/browse", "B", Exchange.FILE_NAME, "foo/b.txt");
+ template.sendBodyAndHeader("file:target/browse", "C", Exchange.FILE_NAME, "bar/c.txt");
+
+ FileEndpoint endpoint = context.getEndpoint("file:target/browse?recursive=true&sortBy=file:name", FileEndpoint.class);
+ assertNotNull(endpoint);
+
+ MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+ assertEquals(0, repo.getCacheSize());
+
+ List<Exchange> list = endpoint.getExchanges();
+ assertNotNull(list);
+ assertEquals(3, list.size());
+
+ assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+ assertEquals("c.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME_ONLY));
+ assertEquals("b.txt", list.get(2).getIn().getHeader(Exchange.FILE_NAME_ONLY));
+
+ // the in progress repo should not leak
+ assertEquals(0, repo.getCacheSize());
+
+ // and the files is still there
+ File fileA = new File("target/browse/a.txt");
+ assertTrue("File should exist " + fileA, fileA.exists());
+ File fileB = new File("target/browse/foo/b.txt");
+ assertTrue("File should exist " + fileB, fileB.exists());
+ File fileC = new File("target/browse/bar/c.txt");
+ assertTrue("File should exist " + fileC, fileC.exists());
+ }
+}
Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java (from r1158296, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java&r1=1158296&r2=1158341&rev=1158341&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToMockTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBrowsableEndpointTest.java Tue Aug 16 16:00:46 2011
@@ -16,35 +16,123 @@
*/
package org.apache.camel.component.file.remote;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
+import java.io.File;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.BrowsableEndpoint;
import org.junit.Test;
/**
* @version
*/
-public class FromFtpToMockTest extends FtpServerTestSupport {
- protected String expectedBody = "Hello there!";
+public class FtpBrowsableEndpointTest extends FtpServerTestSupport {
private String getFtpUrl() {
- return "ftp://admin@localhost:" + getPort() + "/tmp/camel?password=admin&recursive=true";
+ return "ftp://admin@localhost:" + getPort() + "/browse?password=admin";
+ }
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ @Test
+ public void testBrowsableNoFiles() throws Exception {
+ // make sure starting directory exists
+ createDirectory(FTP_ROOT_DIR + "browse");
+
+ BrowsableEndpoint browse = context.getEndpoint(getFtpUrl(), BrowsableEndpoint.class);
+ assertNotNull(browse);
+
+ List<Exchange> list = browse.getExchanges();
+ assertNotNull(list);
+ assertEquals(0, list.size());
}
@Test
- public void testFtpRoute() throws Exception {
- MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
- resultEndpoint.expectedBodiesReceived(expectedBody);
+ public void testBrowsableOneFile() throws Exception {
+ template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt");
+
+ FtpEndpoint endpoint = context.getEndpoint(getFtpUrl(), FtpEndpoint.class);
+ assertNotNull(endpoint);
+
+ MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+ assertEquals(0, repo.getCacheSize());
- template.sendBodyAndHeader(getFtpUrl(), expectedBody, "cheese", 123);
+ List<Exchange> list = endpoint.getExchanges();
+ assertNotNull(list);
+ assertEquals(1, list.size());
- resultEndpoint.assertIsSatisfied();
+ assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+
+ // the in progress repo should not leak
+ assertEquals(0, repo.getCacheSize());
+
+ // and the file is still there
+ File file = new File(FTP_ROOT_DIR + "browse/a.txt");
+ assertTrue("File should exist " + file, file.exists());
}
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- public void configure() throws Exception {
- from(getFtpUrl()).to("mock:result");
- }
- };
+ @Test
+ public void testBrowsableTwoFiles() throws Exception {
+ template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt");
+ template.sendBodyAndHeader(getFtpUrl(), "B", Exchange.FILE_NAME, "b.txt");
+
+ FtpEndpoint endpoint = context.getEndpoint(getFtpUrl() + "&sortBy=file:name", FtpEndpoint.class);
+ assertNotNull(endpoint);
+
+ MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+ assertEquals(0, repo.getCacheSize());
+
+ List<Exchange> list = endpoint.getExchanges();
+ assertNotNull(list);
+ assertEquals(2, list.size());
+
+ assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+ assertEquals("b.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME));
+
+ // the in progress repo should not leak
+ assertEquals(0, repo.getCacheSize());
+
+ // and the files is still there
+ File fileA = new File(FTP_ROOT_DIR + "browse/a.txt");
+ assertTrue("File should exist " + fileA, fileA.exists());
+ File fileB = new File(FTP_ROOT_DIR + "browse/b.txt");
+ assertTrue("File should exist " + fileB, fileB.exists());
}
+
+ @Test
+ public void testBrowsableThreeFilesRecursive() throws Exception {
+ template.sendBodyAndHeader(getFtpUrl(), "A", Exchange.FILE_NAME, "a.txt");
+ template.sendBodyAndHeader(getFtpUrl(), "B", Exchange.FILE_NAME, "foo/b.txt");
+ template.sendBodyAndHeader(getFtpUrl(), "C", Exchange.FILE_NAME, "bar/c.txt");
+
+ FtpEndpoint endpoint = context.getEndpoint(getFtpUrl() + "&recursive=true&sortBy=file:name", FtpEndpoint.class);
+ assertNotNull(endpoint);
+
+ MemoryIdempotentRepository repo = (MemoryIdempotentRepository) endpoint.getInProgressRepository();
+ assertEquals(0, repo.getCacheSize());
+
+ List<Exchange> list = endpoint.getExchanges();
+ assertNotNull(list);
+ assertEquals(3, list.size());
+
+ assertEquals("a.txt", list.get(0).getIn().getHeader(Exchange.FILE_NAME));
+ assertEquals("c.txt", list.get(1).getIn().getHeader(Exchange.FILE_NAME_ONLY));
+ assertEquals("b.txt", list.get(2).getIn().getHeader(Exchange.FILE_NAME_ONLY));
+
+ // the in progress repo should not leak
+ assertEquals(0, repo.getCacheSize());
+
+ // and the files is still there
+ File fileA = new File(FTP_ROOT_DIR + "browse/a.txt");
+ assertTrue("File should exist " + fileA, fileA.exists());
+ File fileB = new File(FTP_ROOT_DIR + "browse/foo/b.txt");
+ assertTrue("File should exist " + fileB, fileB.exists());
+ File fileC = new File(FTP_ROOT_DIR + "browse/bar/c.txt");
+ assertTrue("File should exist " + fileC, fileC.exists());
+ }
+
}
\ No newline at end of file