You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/11/29 18:29:58 UTC

svn commit: r721695 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/ test/java/org/apache/camel/component/file/

Author: davsclaus
Date: Sat Nov 29 09:29:56 2008
New Revision: 721695

URL: http://svn.apache.org/viewvc?rev=721695&view=rev
Log:
CAMEL-1112: Refactored file consumer. Removed @deprected. Added idempotent option. Work in progress as sorting and filtering is needed.

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java   (contents, props changed)
      - copied, changed from r721650, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java
Removed:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryWithNoopTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java Sat Nov 29 09:29:56 2008
@@ -22,6 +22,7 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
 
 /**
  * The <a href="http://activemq.apache.org/camel/file.html">File Component</a>
@@ -42,6 +43,16 @@
      */
     public static final String HEADER_FILE_NAME_PRODUCED = "org.apache.camel.file.name.produced";
 
+    /**
+     * Header key holding the value: current index of total in the batch being consumed
+     */
+    public static final String HEADER_FILE_BATCH_INDEX = "org.apache.camel.file.index";
+
+    /**
+     * Header key holding the value: total in the batch being consumed
+     */
+    public static final String HEADER_FILE_BATCH_TOTAL = "org.apache.camel.file.total";
+
     public FileComponent() {
     }
 
@@ -52,6 +63,14 @@
     protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
         File file = new File(remaining);
         FileEndpoint result = new FileEndpoint(file, uri, this);
+
+        // lookup idempotent repository in registry if provided
+        String ref = getAndRemoveParameter(parameters, "idempotentRepositoryRef", String.class);
+        if (ref != null) {
+            MessageIdRepository repository = mandatoryLookup(ref, MessageIdRepository.class);
+            result.setIdempotentRepository(repository);
+        }
+
         setProperties(result, parameters);
         return result;
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Sat Nov 29 09:29:56 2008
@@ -21,7 +21,8 @@
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Processor;
@@ -40,17 +41,6 @@
     private static final transient Log LOG = LogFactory.getLog(FileConsumer.class);
 
     private FileEndpoint endpoint;
-    private ConcurrentHashMap<File, File> filesBeingProcessed = new ConcurrentHashMap<File, File>();
-    private ConcurrentHashMap<File, Long> fileSizes = new ConcurrentHashMap<File, Long>();
-    private ConcurrentHashMap<File, Long> noopMap = new ConcurrentHashMap<File, Long>();
-
-    // the options below is @deprecated and will be removed in Camel 2.0
-    private long lastPollTime;
-    private int unchangedDelay;
-    private boolean unchangedSize;
-    private boolean generateEmptyExchangeWhenIdle;
-    private boolean alwaysConsume;
-
     private boolean recursive;
     private String regexPattern = "";
     private boolean exclusiveReadLock = true;
@@ -61,80 +51,68 @@
     }
 
     protected synchronized void poll() throws Exception {
-        // should be true the first time as its the top directory
-        int rc = pollFileOrDirectory(endpoint.getFile(), true);
-
-        // if no files consumes and using generateEmptyExchangeWhenIdle option then process an empty exchange 
-        if (rc == 0 && generateEmptyExchangeWhenIdle) {
-            final FileExchange exchange = endpoint.createExchange((File)null);
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                public void done(boolean sync) {
-                }
-            });
+        // gather list of files to process
+        List<File> files = new ArrayList<File>();
+        filesToPoll(endpoint.getFile(), true, files);
+
+        // TODO allow reordering of files CAMEL-1112
+
+        // consume files one by one
+        int total = files.size();
+        for (int index = 0; index < files.size(); index++) {
+            File file = files.get(index);
+            processFile(file, total, index);
         }
-
-        lastPollTime = System.currentTimeMillis();
     }
 
     /**
-     * Pools the given file or directory for files to process.
+     * Scans the given file or directory for files to process.
      *
-     * @param fileOrDirectory  file or directory
+     * @param fileOrDirectory  current file or directory when doing recursion
      * @param processDir  recursive
-     * @return the number of files processed or being processed async.
+     * @param fileList  current list of files gathered
      */
-    protected int pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
+    protected void filesToPoll(File fileOrDirectory, boolean processDir, List<File> fileList) {
+        if (fileOrDirectory == null || !fileOrDirectory.exists()) {
+            // not a file so skip it
+            return;
+        }
+
         if (!fileOrDirectory.isDirectory()) {
-            // process the file
-            return pollFile(fileOrDirectory);
+            addFile(fileOrDirectory, fileList);
         } else if (processDir) {
             // directory that can be recursive
-            int rc = 0;
-            if (isValidFile(fileOrDirectory)) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Polling directory " + fileOrDirectory);
-                }
-                File[] files = fileOrDirectory.listFiles();
-                for (File file : files) {
-                    rc += pollFileOrDirectory(file, isRecursive()); // self-recursion
-                }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Polling directory " + fileOrDirectory);
+            }
+            File[] files = fileOrDirectory.listFiles();
+            for (File file : files) {
+                // recursive add the files
+                filesToPoll(file, isRecursive(), fileList);
             }
-            return rc;
         } else {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Skipping directory " + fileOrDirectory);
             }
-            return 0;
         }
     }
 
     /**
-     * Polls the given file
+     * Processes the given file
      *
      * @param file  the file
-     * @return returns 1 if the file was processed, 0 otherwise.
+     * @param total  total number of files in this batch
+     * @param index  current index out of total in this batch                                      
      */
-    protected int pollFile(final File file) {
+    protected void processFile(final File file, int total, int index) {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Polling file: " + file);
-        }
-
-        if (!file.exists()) {
-            return 0;
-        }
-        if (!isValidFile(file)) {
-            return 0;
-        }
-        // we only care about file modified times if we are not deleting/moving files
-        if (!endpoint.isNoop()) {
-            if (filesBeingProcessed.contains(file)) {
-                return 1;
-            }
-            filesBeingProcessed.put(file, file);
+            LOG.trace("Processing file: " + file);
         }
 
         final FileProcessStrategy processStrategy = endpoint.getFileStrategy();
         final FileExchange exchange = endpoint.createExchange(file);
+        exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_TOTAL, total);
+        exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_INDEX, index);
 
         endpoint.configureMessage(file, exchange.getIn());
         try {
@@ -173,21 +151,16 @@
                             if (!committed) {
                                 processStrategyRollback(processStrategy, exchange, file);
                             }
-                            filesBeingProcessed.remove(file);
                         }
                     }
                 });
 
             } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(endpoint + " can not process file: " + file);
-                }
+                LOG.warn(endpoint + " can not process file: " + file);
             }
-        } catch (Throwable e) {
+        } catch (Exception e) {
             handleException(e);
         }
-
-        return 1;
     }
 
     /**
@@ -252,74 +225,33 @@
         processStrategy.rollback(endpoint, exchange, file);
     }
 
+    /**
+     * Strategy for validating if the given file should be included or not
+     * @param file  the file
+     * @return true to include the file, false to skip it
+     */
     protected boolean isValidFile(File file) {
-        boolean result = false;
-        if (file != null && file.exists()) {
-            // TODO: maybe use a configurable strategy instead of the hardcoded one based on last file change
-            if (isMatched(file) && (alwaysConsume || isChanged(file))) {
-                result = true;
-            }
-        }
-        return result;
-    }
-
-    protected boolean isChanged(File file) {
-        if (file == null) {
-            // Sanity check
+        // NOTE: contains will add if we had a miss
+        if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getName())) {
+            // skip as we have already processed it
             return false;
-        } else if (file.isDirectory()) {
-            // Allow recursive polling to descend into this directory
-            return true;
-        } else {
-            // @deprecated will be removed on Camel 2.0
-            // the code below is kinda hard to maintain. We should strive to remove
-            // this stuff in Camel 2.0 to keep this component simple and no surprises for end-users
-            // this stuff is not persistent so restarting Camel will reset the state
-            boolean lastModifiedCheck = false;
-            long modifiedDuration = 0;
-            if (getUnchangedDelay() > 0) {
-                modifiedDuration = System.currentTimeMillis() - file.lastModified();
-                lastModifiedCheck = modifiedDuration >= getUnchangedDelay();
-            }
-
-            long fileModified = file.lastModified();
-            Long previousModified = noopMap.get(file);
-            noopMap.put(file, fileModified);
-            if (previousModified == null || fileModified > previousModified) {
-                lastModifiedCheck = true;
-            }
-
-            boolean sizeCheck = false;
-            long sizeDifference = 0;
-            if (isUnchangedSize()) {
-                Long value = fileSizes.get(file);
-                if (value == null) {
-                    sizeCheck = true;
-                } else {
-                    sizeCheck = file.length() != value;
-                }
-            }
-
-            boolean answer = lastModifiedCheck || sizeCheck;
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("file:" + file + " isChanged:" + answer + " " + "sizeCheck:" + sizeCheck + "("
-                          + sizeDifference + ") " + "lastModifiedCheck:" + lastModifiedCheck + "("
-                          + modifiedDuration + ")");
-            }
-
-            if (isUnchangedSize()) {
-                if (answer) {
-                    fileSizes.put(file, file.length());
-                } else {
-                    fileSizes.remove(file);
-                }
-            }
-
-            return answer;
         }
+
+        return isMatched(file);
     }
 
+    /**
+     * Strategy to perform file matching based on endpoint configuration.
+     * <p/>
+     * Will always return false for certain files:
+     * <ul>
+     *    <li>Starting with a dot</li>
+     *    <li>lock files</li>
+     * </ul>
+     *
+     * @param file  the file
+     * @return true if the file is matche, false if not
+     */
     protected boolean isMatched(File file) {
         String name = file.getName();
 
@@ -357,6 +289,12 @@
         return true;
     }
 
+    private void addFile(File file, List<File> fileList) {
+        if (isValidFile(file)) {
+            fileList.add(file);
+        }
+    }
+
     public boolean isRecursive() {
         return this.recursive;
     }
@@ -373,39 +311,6 @@
         this.regexPattern = regexPattern;
     }
 
-    public boolean isGenerateEmptyExchangeWhenIdle() {
-        return generateEmptyExchangeWhenIdle;
-    }
-
-    /**
-     * @deprecated will be removed in Camel 2.0
-     */
-    public void setGenerateEmptyExchangeWhenIdle(boolean generateEmptyExchangeWhenIdle) {
-        this.generateEmptyExchangeWhenIdle = generateEmptyExchangeWhenIdle;
-    }
-
-    public int getUnchangedDelay() {
-        return unchangedDelay;
-    }
-
-    /**
-     * @deprecated will be removed in Camel 2.0
-     */
-    public void setUnchangedDelay(int unchangedDelay) {
-        this.unchangedDelay = unchangedDelay;
-    }
-
-    public boolean isUnchangedSize() {
-        return unchangedSize;
-    }
-
-    /**
-     * @deprecated will be removed in Camel 2.0
-     */
-    public void setUnchangedSize(boolean unchangedSize) {
-        this.unchangedSize = unchangedSize;
-    }
-
     public boolean isExclusiveReadLock() {
         return exclusiveReadLock;
     }
@@ -414,25 +319,4 @@
         this.exclusiveReadLock = exclusiveReadLock;
     }
 
-    public boolean isAlwaysConsume() {
-        return alwaysConsume;
-    }
-
-    /**
-     * @deprecated will be removed in Camel 2.0 (not needed when we get rid of last polltimestamp)
-     */
-    public void setAlwaysConsume(boolean alwaysConsume) {
-        this.alwaysConsume = alwaysConsume;
-    }
-
-    public boolean isTimestamp() {
-        return !alwaysConsume;
-    }
-
-    /**
-     * @deprecated will be removed in Camel 2.0 (not needed when we get rid of last polltimestamp)
-     */
-    public void setTimestamp(boolean timestamp) {
-        this.alwaysConsume = !timestamp;
-    }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java Sat Nov 29 09:29:56 2008
@@ -31,6 +31,8 @@
 import org.apache.camel.Producer;
 import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.camel.language.simple.FileLanguage;
+import org.apache.camel.processor.idempotent.MemoryMessageIdRepository;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
 import org.apache.camel.util.FactoryFinder;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.UuidGenerator;
@@ -49,6 +51,7 @@
     private static final transient Log LOG = LogFactory.getLog(FileEndpoint.class);
     private static final transient String DEFAULT_STRATEGYFACTORY_CLASS =
         "org.apache.camel.component.file.strategy.FileProcessStrategyFactory";
+    private static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
 
     private File file;
     private FileProcessStrategy fileProcessStrategy;
@@ -65,6 +68,8 @@
     private boolean ignoreFileNameHeader;
     private Expression expression;
     private String tempPrefix;
+    private boolean idempotent;
+    private MessageIdRepository idempotentRepository;
 
     protected FileEndpoint(File file, String endpointUri, FileComponent component) {
         super(endpointUri, component);
@@ -89,6 +94,13 @@
 
     public Consumer createConsumer(Processor processor) throws Exception {
         Consumer result = new FileConsumer(this, processor);
+
+        // if idempotent and no repository set then create a default one
+        if (isIdempotent() && idempotentRepository == null) {
+            LOG.info("Using default memory based idempotent repository with cache max size: " + DEFAULT_IDEMPOTENT_CACHE_SIZE);
+            idempotentRepository = MemoryMessageIdRepository.memoryMessageIdRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
+        }
+
         configureConsumer(result);
         return result;
     }
@@ -306,6 +318,22 @@
         this.tempPrefix = tempPrefix;
     }
 
+    public boolean isIdempotent() {
+        return idempotent;
+    }
+
+    public void setIdempotent(boolean idempotent) {
+        this.idempotent = idempotent;
+    }
+
+    public MessageIdRepository getIdempotentRepository() {
+        return idempotentRepository;
+    }
+
+    public void setIdempotentRepository(MessageIdRepository idempotentRepository) {
+        this.idempotentRepository = idempotentRepository;
+    }
+
     /**
      * A strategy method to lazily create the file strategy
      */

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java?rev=721695&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java Sat Nov 29 09:29:56 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for consuming a batch of files (multiple files in one consume)
+ */
+public class FileConsumerBatchTest extends ContextTestSupport {
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/idempotent");
+        template.sendBodyAndHeader("file://target/file-batch/", "Hello World", FileComponent.HEADER_FILE_NAME, "hello.txt");
+        template.sendBodyAndHeader("file://target/file-batch/", "Bye World", FileComponent.HEADER_FILE_NAME, "bye.txt");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("file://target/file-batch?consumer.delay=1000").to("mock:result");
+            }
+        };
+    }
+
+    public void testConsumeBatch() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+
+        // test header keys
+        mock.message(0).header(FileComponent.HEADER_FILE_BATCH_TOTAL).isEqualTo(2);
+        mock.message(0).header(FileComponent.HEADER_FILE_BATCH_INDEX).isEqualTo(0);
+        mock.message(1).header(FileComponent.HEADER_FILE_BATCH_INDEX).isEqualTo(1);
+
+        assertMockEndpointsSatisfied();
+    }
+
+}
\ No newline at end of file

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java?rev=721695&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java Sat Nov 29 09:29:56 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
+
+/**
+ * Unit test for the idempotentRepositoryRef option.
+ */
+public class FileConsumerIdempotentRefTest extends ContextTestSupport {
+
+    private static boolean invoked;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myRepo", new MyIdempotentRepository());
+        return jndi;
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/idempotent");
+        template.sendBodyAndHeader("file://target/idempotent/", "Hello World", FileComponent.HEADER_FILE_NAME, "report.txt");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("file://target/idempotent/?idempotent=true&idempotentRepositoryRef=myRepo&moveNamePrefix=done/").to("mock:result");
+            }
+        };
+    }
+
+    public void testIdempotentRef() throws Exception {
+        // consume the file the first time
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedMessageCount(1);
+
+        assertMockEndpointsSatisfied();
+
+        Thread.sleep(1000);
+
+        // reset mock and set new expectations
+        mock.reset();
+        mock.expectedMessageCount(0);
+
+        // move file back
+        File file = new File("target/idempotent/done/report.txt");
+        File renamed = new File("target/idempotent/report.txt");
+        file = file.getAbsoluteFile();
+        file.renameTo(renamed.getAbsoluteFile());
+
+        // should NOT consume the file again, let 2 secs pass to let the consumer try to consume it but it should not
+        Thread.sleep(2000);
+        assertMockEndpointsSatisfied();
+
+        assertTrue("MyIdempotentRepository should have been invoked", invoked);
+    }
+
+    public class MyIdempotentRepository implements MessageIdRepository {
+
+        public boolean contains(String messageId) {
+            // will return false 1st time, and true 2nd time
+            boolean result = invoked;
+            invoked = true;
+            assertEquals("report.txt", messageId);
+            return result;
+        }
+    }
+    
+}
\ No newline at end of file

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java (from r721650, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java&r1=721650&r2=721695&rev=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java Sat Nov 29 09:29:56 2008
@@ -23,27 +23,27 @@
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Unit test for the alwaysConsume=false option.
+ * Unit test for the idempotent=true option.
  */
-public class FileAlwaysConsumeFalseTest extends ContextTestSupport {
+public class FileConsumerIdempotentTest extends ContextTestSupport {
 
     @Override
     protected void setUp() throws Exception {
         super.setUp();
-        deleteDirectory("target/alwaysconsume");
-        template.sendBodyAndHeader("file://target/alwaysconsume/", "Hello World", FileComponent.HEADER_FILE_NAME, "report.txt");
+        deleteDirectory("target/idempotent");
+        template.sendBodyAndHeader("file://target/idempotent/", "Hello World", FileComponent.HEADER_FILE_NAME, "report.txt");
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("file://target/alwaysconsume/?consumer.alwaysConsume=false&moveNamePrefix=done/").to("mock:result");
+                from("file://target/idempotent/?idempotent=true&moveNamePrefix=done/").to("mock:result");
             }
         };
     }
 
-    public void testNotAlwaysConsume() throws Exception {
+    public void testIdempotent() throws Exception {
         // consume the file the first time
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World");
@@ -58,12 +58,12 @@
         mock.expectedMessageCount(0);
 
         // move file back
-        File file = new File("target/alwaysconsume/done/report.txt");
-        File renamed = new File("target/alwaysconsume/report.txt");
+        File file = new File("target/idempotent/done/report.txt");
+        File renamed = new File("target/idempotent/report.txt");
         file = file.getAbsoluteFile();
         file.renameTo(renamed.getAbsoluteFile());
 
-        // should NOT consume the file again, let 2 secs pass to let the consuemr try to consume it but it should not
+        // should NOT consume the file again, let 2 secs pass to let the consumer try to consume it but it should not
         Thread.sleep(2000);
         assertMockEndpointsSatisfied();
     }

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java Sat Nov 29 09:29:56 2008
@@ -24,6 +24,7 @@
  * @version $Revision$
  */
 public class FileDeleteRouteTest extends FileRouteTest {
+
     @Override
     protected void setUp() throws Exception {
         targetdir = "target/test-delete-inbox";
@@ -33,17 +34,19 @@
 
     @Override
     public void testFileRoute() throws Exception {
+        deleteDirectory("target/test-delete-inbox");
+        
         MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         result.expectedBodiesReceived(expectedBody);
-        result.setResultWaitTime(5000);
 
         template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
-        Thread.sleep(4000);
-
         result.assertIsSatisfied();
 
+        Thread.sleep(100);
+
         for (String lockName : recorder.getLocks()) {
             File lock = new File(lockName);
+            lock = lock.getAbsoluteFile();
             assertFalse(lock.exists());
         }
     }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryTest.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryTest.java Sat Nov 29 09:29:56 2008
@@ -41,13 +41,13 @@
         result.setResultWaitTime(5000);
 
         // now lets wait a bit and move that file
-        Thread.sleep(5000);
+        Thread.sleep(2000);
 
         // lets delete the output directory
         deleteDirectory(outputDirectory);
 
         // now lets wait a bit for it to be polled
-        Thread.sleep(5000);
+        Thread.sleep(2000);
 
         File file = new File(inputDirectory + "/" + fileName);
 
@@ -66,7 +66,7 @@
         assertFileExists(newFile);
 
         // now lets wait for multiple polls to check we only process it once
-        Thread.sleep(5000);
+        Thread.sleep(3000);
 
         assertMockEndpointsSatisfied();
     }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryWithNoopTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryWithNoopTest.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryWithNoopTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryWithNoopTest.java Sat Nov 29 09:29:56 2008
@@ -23,6 +23,6 @@
 
     @Override
     protected String getOutputEndpointUri() {
-        return super.getOutputEndpointUri() + "?noop=true";
+        return super.getOutputEndpointUri() + "?noop=true&idempotent=true";
     }
 }