You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/05/04 19:59:40 UTC

[1/6] camel git commit: CAMEL-8688: Stream cache now keeps track on number of copies that was spooled to disk, so when the temp file is deleted its only deleted when no longer in use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer

Repository: camel
Updated Branches:
  refs/heads/master cfdf18542 -> d1121a7b4


CAMEL-8688: Stream cache now keeps track on number of copies that was spooled to disk, so when the temp file is deleted its only deleted when no longer in use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0fa7d693
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0fa7d693
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0fa7d693

Branch: refs/heads/master
Commit: 0fa7d6935b890cf8020dffede6145fe8fb7d376f
Parents: a535a47
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 19:31:15 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 19:31:30 2015 +0200

----------------------------------------------------------------------
 .../component/netty4/http/NettyChannelBufferStreamCache.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0fa7d693/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
index d8a183f..70635f0 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
@@ -21,6 +21,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
 
@@ -87,7 +88,7 @@ public final class NettyChannelBufferStreamCache extends InputStream implements
     }
 
     @Override
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         return new NettyChannelBufferStreamCache(buffer.copy());
     }
 


[2/6] camel git commit: CAMEL-8688: Stream cache now keeps track on number of copies that was spooled to disk, so when the temp file is deleted its only deleted when no longer in use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer

Posted by da...@apache.org.
CAMEL-8688: Stream cache now keeps track on number of copies that was spooled to disk, so when the temp file is deleted its only deleted when no longer in use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a99f6d57
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a99f6d57
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a99f6d57

Branch: refs/heads/master
Commit: a99f6d5710c3068fa3fe841d4f80dc82deb1142b
Parents: cfdf185
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 19:09:51 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 19:31:30 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/StreamCache.java |   6 +-
 .../stream/ByteArrayInputStreamCache.java       |   3 +-
 .../converter/stream/CachedOutputStream.java    | 121 ++-----------
 .../converter/stream/FileInputStreamCache.java  | 173 +++++++++++++++++--
 .../converter/stream/InputStreamCache.java      |   3 +-
 .../camel/converter/stream/ReaderCache.java     |   3 +-
 .../camel/converter/stream/SourceCache.java     |   3 +-
 .../converter/stream/StreamSourceCache.java     |   6 +-
 .../camel/processor/MulticastProcessor.java     |   2 +-
 .../camel/processor/WireTapProcessor.java       |   2 +-
 .../processor/WireTapStreamCachingTest.java     |  19 +-
 .../apache/camel/util/MessageHelperTest.java    |   2 +-
 .../apache/camel/processor/twoCharacters.txt    |   1 +
 .../http/NettyChannelBufferStreamCache.java     |   3 +-
 14 files changed, 207 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/StreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/StreamCache.java b/camel-core/src/main/java/org/apache/camel/StreamCache.java
index ecd9736..29f4284 100644
--- a/camel-core/src/main/java/org/apache/camel/StreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/StreamCache.java
@@ -29,7 +29,7 @@ import java.io.OutputStream;
  * The Camel routing engine uses the {@link org.apache.camel.processor.CamelInternalProcessor.StreamCachingAdvice}
  * to apply the stream cache during routing.
  * <p/>
- * It is recommended in the {@link #copy()} method to let the copied stream start from the start. If the implementation
+ * It is recommended in the {@link #copy(Exchange)} method to let the copied stream start from the start. If the implementation
  * does not support copy, then return <tt>null</tt>.
  *
  * @version 
@@ -60,10 +60,12 @@ public interface StreamCache {
      * Implementations note: A copy of the stream is recommended to read from the start
      * of the stream.
      *
+     * @param exchange exchange in which the stream cache object is used; 
+     *                 can be used to delete resources of the stream cache when the exchange is completed
      * @return a copy, or <tt>null</tt> if copy is not possible
      * @throws java.io.IOException is thrown if the copy fails
      */
-    StreamCache copy() throws IOException;
+    StreamCache copy(Exchange exchange) throws IOException;
 
     /**
      * Whether this {@link StreamCache} is in memory only or

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
index 3b1dacf..9375ee3 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
@@ -22,6 +22,7 @@ import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
 
@@ -51,7 +52,7 @@ public class ByteArrayInputStreamCache extends FilterInputStream implements Stre
         IOHelper.copyAndCloseInput(in, os);
     }
 
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         if (byteArrayForCopy == null) {
             ByteArrayOutputStream baos = new ByteArrayOutputStream(in.available());
             IOHelper.copy(in, baos);

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index 639e339..d722baf 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -16,29 +16,15 @@
  */
 package org.apache.camel.converter.stream;
 
-import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.security.GeneralSecurityException;
-
-import javax.crypto.CipherOutputStream;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
-import org.apache.camel.converter.stream.FileInputStreamCache.FileInputStreamCloser;
+import org.apache.camel.converter.stream.FileInputStreamCache.TempFileManager;
 import org.apache.camel.spi.StreamCachingStrategy;
-import org.apache.camel.spi.Synchronization;
-import org.apache.camel.spi.UnitOfWork;
-import org.apache.camel.support.SynchronizationAdapter;
-import org.apache.camel.util.FileUtil;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This output stream will store the content into a File if the stream context size is exceed the
@@ -50,7 +36,7 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * You can get a cached input stream of this stream. The temp file which is created with this 
  * output stream will be deleted when you close this output stream or the cached 
- * fileInputStream(s) is/are closed after the exchange is completed.
+ * fileInputStream(s) is/are closed after all the exchanges using the temp file are completed.
  */
 public class CachedOutputStream extends OutputStream {
     @Deprecated
@@ -61,16 +47,12 @@ public class CachedOutputStream extends OutputStream {
     public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
     @Deprecated
     public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation";
-    private static final Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class);
 
     private final StreamCachingStrategy strategy;
     private OutputStream currentStream;
     private boolean inMemory = true;
     private int totalLength;
-    private File tempFile;
-    private FileInputStreamCache fileInputStreamCache;
-    private final FileInputStreamCloser fileInputStreamCloser = new FileInputStreamCloser();
-    private CipherPair ciphers;
+    private final TempFileManager tempFileManager;
     private final boolean closedOnCompletion;
 
     public CachedOutputStream(Exchange exchange) {
@@ -79,44 +61,10 @@ public class CachedOutputStream extends OutputStream {
 
     public CachedOutputStream(Exchange exchange, final boolean closedOnCompletion) {
         this.closedOnCompletion = closedOnCompletion;
+        tempFileManager = new TempFileManager(closedOnCompletion);
+        tempFileManager.addExchange(exchange);
         this.strategy = exchange.getContext().getStreamCachingStrategy();
         currentStream = new CachedByteArrayOutputStream(strategy.getBufferSize());
-        if (closedOnCompletion) {
-            // add on completion so we can cleanup after the exchange is done such as deleting temporary files
-            Synchronization onCompletion = new SynchronizationAdapter() {
-                @Override
-                public void onDone(Exchange exchange) {
-                    try {
-                        closeFileInputStreams();
-                        close();
-                        try {
-                            cleanUpTempFile();
-                        } catch (Exception e) {
-                            LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e);
-                        }
-                    } catch (Exception e) {
-                        LOG.warn("Error closing streams. This exception will be ignored.", e);
-                    }
-                }
-
-                @Override
-                public String toString() {
-                    return "OnCompletion[CachedOutputStream]";
-                }
-            };
-
-            UnitOfWork streamCacheUnitOfWork = exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class);
-            if (streamCacheUnitOfWork != null) {
-                // The stream cache must sometimes not be closed when the exchange is deleted. This is for example the
-                // case in the splitter and multi-cast case with AggregationStrategy where the result of the sub-routes
-                // are aggregated later in the main route. Here, the cached streams of the sub-routes must be closed with
-                // the Unit of Work of the main route.
-                streamCacheUnitOfWork.addSynchronization(onCompletion);
-            } else {
-                // add on completion so we can cleanup after the exchange is done such as deleting temporary files
-                exchange.addOnCompletion(onCompletion);
-            }
-        }
     }
 
     public void flush() throws IOException {
@@ -127,12 +75,8 @@ public class CachedOutputStream extends OutputStream {
         currentStream.close();
         // need to clean up the temp file this time
         if (!closedOnCompletion) {
-            closeFileInputStreams();
-            try {
-                cleanUpTempFile();
-            } catch (Exception e) {
-                LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e);
-            }
+            tempFileManager.closeFileInputStreams();
+            tempFileManager.cleanUpTempFile();
         }
     }
 
@@ -206,40 +150,17 @@ public class CachedOutputStream extends OutputStream {
                 throw new IllegalStateException("CurrentStream should be an instance of CachedByteArrayOutputStream but is: " + currentStream.getClass().getName());
             }
         } else {
-            try {
-                if (fileInputStreamCache == null) {
-                    fileInputStreamCache = new FileInputStreamCache(tempFile, ciphers, fileInputStreamCloser);
-                }
-                return fileInputStreamCache;
-            } catch (FileNotFoundException e) {
-                throw new IOException("Cached file " + tempFile + " not found", e);
-            }
+            return tempFileManager.newStreamCache();
         }
     }
     
-    private void closeFileInputStreams() {
-        fileInputStreamCloser.close();
-        fileInputStreamCache = null;
-    } 
-
-    private void cleanUpTempFile() {
-        // cleanup temporary file
-        if (tempFile != null) {
-            FileUtil.deleteFile(tempFile);
-            tempFile = null;
-        }
-    }
 
     private void pageToFileStream() throws IOException {
         flush();
-
         ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
-        tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getSpoolDirectory());
-
-        LOG.trace("Creating temporary stream cache file: {}", tempFile);
-
         try {
-            currentStream = createOutputStream(tempFile);
+            // creates an tmp file and a file output stream
+            currentStream = tempFileManager.createOutputStream(strategy);
             bout.writeTo(currentStream);
         } finally {
             // ensure flag is flipped to file based
@@ -291,26 +212,4 @@ public class CachedOutputStream extends OutputStream {
         }
     }
 
-    private OutputStream createOutputStream(File file) throws IOException {
-        OutputStream out = new BufferedOutputStream(new FileOutputStream(file));
-        if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) {
-            try {
-                if (ciphers == null) {
-                    ciphers = new CipherPair(strategy.getSpoolChiper());
-                }
-            } catch (GeneralSecurityException e) {
-                throw new IOException(e.getMessage(), e);
-            }
-            out = new CipherOutputStream(out, ciphers.getEncryptor()) {
-                boolean closed;
-                public void close() throws IOException {
-                    if (!closed) {
-                        super.close();
-                        closed = true;
-                    }
-                }
-            };
-        }
-        return out;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
index c6e99ac..a0d6501 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
@@ -17,44 +17,60 @@
 package org.apache.camel.converter.stream;
 
 import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
+import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StreamCache;
+import org.apache.camel.spi.StreamCachingStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link StreamCache} for {@link File}s
  */
 public final class FileInputStreamCache extends InputStream implements StreamCache {
     private InputStream stream;
+    private final long length;
+    private final FileInputStreamCache.TempFileManager tempFileManager;
     private final File file;
     private final CipherPair ciphers;
-    private final long length;
-    private final FileInputStreamCache.FileInputStreamCloser closer;
 
+    /** Only for testing purposes.*/
     public FileInputStreamCache(File file) throws FileNotFoundException {
-        this(file, null, new FileInputStreamCloser());
+        this(new TempFileManager(file, true));
     }
     
-    FileInputStreamCache(File file, CipherPair ciphers, FileInputStreamCloser closer) throws FileNotFoundException {
-        this.file = file;
+    FileInputStreamCache(TempFileManager closer) throws FileNotFoundException {
+        this.file = closer.getTempFile();
         this.stream = null;
-        this.ciphers = ciphers;
+        this.ciphers = closer.getCiphers();
         this.length = file.length();
-        this.closer = closer;
-        this.closer.add(this);
+        this.tempFileManager = closer;
+        this.tempFileManager.add(this);
     }
     
     @Override
@@ -99,8 +115,9 @@ public final class FileInputStreamCache extends InputStream implements StreamCac
         }
     }
 
-    public StreamCache copy() throws IOException {
-        FileInputStreamCache copy = new FileInputStreamCache(file, ciphers, closer);
+    public StreamCache copy(Exchange exchange) throws IOException {
+        tempFileManager.addExchange(exchange);
+        FileInputStreamCache copy = new FileInputStreamCache(tempFileManager);
         return copy;
     }
 
@@ -146,16 +163,37 @@ public final class FileInputStreamCache extends InputStream implements StreamCac
     }
 
     /** 
-     * Collects all FileInputStreamCache instances of a temporary file which must be closed
-     * at the end of the route.
+     * Manages the temporary file for the file input stream caches.
+     * 
+     * Collects all FileInputStreamCache instances of the temporary file.
+     * Counts the number of exchanges which have a FileInputStreamCache  instance of the temporary file.
+     * Deletes the temporary file, if all exchanges are done.
      * 
      * @see CachedOutputStream
      */
-    static class FileInputStreamCloser {
+    static class TempFileManager {
+        
+        private static final Logger LOG = LoggerFactory.getLogger(TempFileManager.class);
+        /** Indicator whether the file input stream caches are closed on completion of the exchanges. */
+        private final boolean closedOnCompletion;
+        private AtomicInteger exchangeCounter = new AtomicInteger();
+        private File tempFile;
+        private OutputStream outputStream; // file output stream
+        private CipherPair ciphers;
         
-        // there can be several input streams, for example in the multi-cast parallel processing
+        // there can be several input streams, for example in the multi-cast, or wiretap parallel processing
         private List<FileInputStreamCache> fileInputStreamCaches;
+
+        /** Only for testing.*/
+        private TempFileManager(File file, boolean closedOnCompletion) {
+            this(closedOnCompletion);
+            this.tempFile = file;
+        }
         
+        TempFileManager(boolean closedOnCompletion) {
+            this.closedOnCompletion = closedOnCompletion;
+        }
+                
         /** Adds a FileInputStreamCache instance to the closer.
          * <p>
          * Must be synchronized, because can be accessed by several threads. 
@@ -167,14 +205,119 @@ public final class FileInputStreamCache extends InputStream implements StreamCac
             fileInputStreamCaches.add(fileInputStreamCache);
         }
         
-        void close() {
+        void addExchange(Exchange exchange) {
+            if (closedOnCompletion) {
+                exchangeCounter.incrementAndGet();
+                // add on completion so we can cleanup after the exchange is done such as deleting temporary files
+                Synchronization onCompletion = new SynchronizationAdapter() {
+                    @Override
+                    public void onDone(Exchange exchange) {
+                        int actualExchanges = exchangeCounter.decrementAndGet();
+                        if (actualExchanges == 0) {
+                            // only one exchange (one thread) left, therefore we must not synchronize the following lines of code
+                            try {                              
+                                closeFileInputStreams();
+                                if (outputStream != null) {
+                                    outputStream.close();
+                                }
+                                try {
+                                    cleanUpTempFile();
+                                } catch (Exception e) {
+                                    LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e);
+                                }
+                            } catch (Exception e) {
+                                LOG.warn("Error closing streams. This exception will be ignored.", e);
+                            }
+                        }
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "OnCompletion[CachedOutputStream]";
+                    }
+                };
+                UnitOfWork streamCacheUnitOfWork = exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class);
+                if (streamCacheUnitOfWork != null) {
+                    // The stream cache must sometimes not be closed when the exchange is deleted. This is for example the
+                    // case in the splitter and multi-cast case with AggregationStrategy where the result of the sub-routes
+                    // are aggregated later in the main route. Here, the cached streams of the sub-routes must be closed with
+                    // the Unit of Work of the main route.
+                    streamCacheUnitOfWork.addSynchronization(onCompletion);
+                } else {
+                    // add on completion so we can cleanup after the exchange is done such as deleting temporary files
+                    exchange.addOnCompletion(onCompletion);
+                }
+            }
+        }
+        
+        OutputStream createOutputStream(StreamCachingStrategy strategy) throws IOException {
+            // should only be called once
+            if (tempFile != null) {
+                throw new IllegalStateException("The method 'createOutputStream' can only be called once!");
+            }
+            tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getSpoolDirectory());
+
+            LOG.trace("Creating temporary stream cache file: {}", tempFile);
+            OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFile));
+            if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) {
+                try {
+                    if (ciphers == null) {
+                        ciphers = new CipherPair(strategy.getSpoolChiper());
+                    }
+                } catch (GeneralSecurityException e) {
+                    throw new IOException(e.getMessage(), e);
+                }
+                out = new CipherOutputStream(out, ciphers.getEncryptor()) {
+                    boolean closed;
+                    public void close() throws IOException {
+                        if (!closed) {
+                            super.close();
+                            closed = true;
+                        }
+                    }
+                };
+            }
+            outputStream = out;
+            return out;
+        }
+        
+        FileInputStreamCache newStreamCache() throws IOException {
+            try {
+                return new FileInputStreamCache(this);
+            } catch (FileNotFoundException e) {
+                throw new IOException("Cached file " + tempFile + " not found", e);
+            }
+        }
+        
+        void closeFileInputStreams() {
             if (fileInputStreamCaches != null) {
                 for (FileInputStreamCache fileInputStreamCache : fileInputStreamCaches) {
                     fileInputStreamCache.close();
                 }
                 fileInputStreamCaches.clear();
             }
+        } 
+
+        void cleanUpTempFile() {
+            // cleanup temporary file
+            try {
+                if (tempFile != null) {
+                    FileUtil.deleteFile(tempFile);
+                    tempFile = null;
+                }
+            } catch (Exception e) {
+                LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e);
+            }
+        }
+        
+        File getTempFile() {
+            return tempFile;
+        }
+        
+        CipherPair getCiphers() {
+            return ciphers;
         }
+        
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
index ba7f12e..78422a7 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 
 /**
@@ -40,7 +41,7 @@ public final class InputStreamCache extends ByteArrayInputStream implements Stre
         os.write(buf, pos, count - pos);
     }
 
-    public StreamCache copy() {
+    public StreamCache copy(Exchange exchange) {
         return new InputStreamCache(buf, count);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
index bed761c..2890945 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.StringReader;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 
 /**
@@ -51,7 +52,7 @@ public class ReaderCache extends StringReader implements StreamCache {
         os.write(data.getBytes());
     }
 
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         return new ReaderCache(data);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
index 16f8422..4f00eb4 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
@@ -19,6 +19,7 @@ package org.apache.camel.converter.stream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.StringSource;
 import org.apache.camel.util.IOHelper;
@@ -44,7 +45,7 @@ public final class SourceCache extends StringSource implements StreamCache {
         IOHelper.copy(getInputStream(), os);
     }
 
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         return new SourceCache(getText());
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
index 499f799..a7edfc9 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
@@ -83,12 +83,12 @@ public final class StreamSourceCache extends StreamSource implements StreamCache
         }
     }
 
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         if (streamCache != null) {
-            return new StreamSourceCache(streamCache.copy());
+            return new StreamSourceCache(streamCache.copy(exchange));
         }
         if (readCache != null) {
-            return new StreamSourceCache(readCache.copy());
+            return new StreamSourceCache(readCache.copy(exchange));
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 334ceb1..e4a2ef8 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -953,7 +953,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                 if (index > 0) {
                     // copy it otherwise parallel processing is not possible,
                     // because streams can only be read once
-                    StreamCache copiedStreamCache = streamCache.copy();
+                    StreamCache copiedStreamCache = streamCache.copy(copy);
                     if (copiedStreamCache != null) {
                         copy.getIn().setBody(copiedStreamCache);  
                     }

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index a74e663..1d6b835 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -180,7 +180,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
         if (msg.getBody() instanceof StreamCache) {
             // in parallel processing case, the stream must be copied, therefore get the stream
             StreamCache cache = (StreamCache) msg.getBody();
-            StreamCache copied = cache.copy();
+            StreamCache copied = cache.copy(answer);
             if (copied != null) {
                 msg.setBody(copied);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
index 0a87c13..1db7307 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor;
 
 import java.io.StringReader;
+
 import javax.xml.transform.stream.StreamSource;
 
 import org.apache.camel.ContextTestSupport;
@@ -26,6 +27,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
 
 /**
  * @version 
@@ -51,7 +53,19 @@ public class WireTapStreamCachingTest extends ContextTestSupport {
 
         assertMockEndpointsSatisfied();
     }
+    
+    @Test
+    public void testSendingAMessageUsingWiretapShouldNotDeleteStreamFileBeforeAllExcangesAreComplete() throws InterruptedException {
+
+        x.expectedMessageCount(1);
+        y.expectedMessageCount(1);
+        z.expectedMessageCount(1);
+
+        // the used file should contain more than one character in order to be streamed into the file system
+        template.sendBody("direct:a", this.getClass().getClassLoader().getResourceAsStream("org/apache/camel/processor/twoCharacters.txt"));
 
+        assertMockEndpointsSatisfied();
+    }
 
     @Override
     protected void setUp() throws Exception {
@@ -76,6 +90,8 @@ public class WireTapStreamCachingTest extends ContextTestSupport {
             public void configure() {
                 // enable stream caching
                 context.setStreamCaching(true);
+                // set stream threshold to 1, in order to stream into the file system
+                context.getStreamCachingStrategy().setSpoolThreshold(1);
 
                 errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(3));
 
@@ -83,7 +99,8 @@ public class WireTapStreamCachingTest extends ContextTestSupport {
                 from("direct:a").wireTap("direct:x").wireTap("direct:y").wireTap("direct:z");
 
                 from("direct:x").process(processor).to("mock:x");
-                from("direct:y").process(processor).to("mock:y");
+                // even if a process takes more time then the others the wire tap shall work
+                from("direct:y").delay(2000).process(processor).to("mock:y");
                 from("direct:z").process(processor).to("mock:z");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
index 7448989..cf80614 100644
--- a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
+++ b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
@@ -63,7 +63,7 @@ public class MessageHelperTest extends TestCase {
                 // noop
             }
 
-            public StreamCache copy() throws IOException {
+            public StreamCache copy(Exchange exchange) throws IOException {
                 return null;
             }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt
----------------------------------------------------------------------
diff --git a/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt b/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt
new file mode 100644
index 0000000..dfc9179
--- /dev/null
+++ b/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt
@@ -0,0 +1 @@
+AB
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
index 8ecb8f7..b3afc4a 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -87,7 +88,7 @@ public final class NettyChannelBufferStreamCache extends InputStream implements
     }
 
     @Override
-    public StreamCache copy() throws IOException {
+    public StreamCache copy(Exchange exchange) throws IOException {
         return new NettyChannelBufferStreamCache(buffer.copy());
     }
 


[5/6] camel git commit: CAMEL-8688: Stream cache now keeps track on number of copies that was spooled to disk, so when the temp file is deleted its only deleted when no longer in use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer

Posted by da...@apache.org.
CAMEL-8688: Stream cache now keeps track on number of copies that was spooled to disk, so when the temp file is deleted its only deleted when no longer in use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f1c5b547
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f1c5b547
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f1c5b547

Branch: refs/heads/master
Commit: f1c5b5472deca2f435e669c814655dd26e3512f6
Parents: 504b0d8
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 20:02:08 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 20:02:08 2015 +0200

----------------------------------------------------------------------
 .../camel/component/cxf/converter/CachedCxfPayload.java      | 8 ++++----
 .../camel/component/cxf/converter/CachedCxfPayloadTest.java  | 2 +-
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f1c5b547/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
index d54a026..7cc94a6 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
@@ -92,14 +92,14 @@ public class CachedCxfPayload<T> extends CxfPayload<T> implements StreamCache {
         }
     }
 
-    private CachedCxfPayload(CachedCxfPayload<T> orig) throws IOException {
+    private CachedCxfPayload(CachedCxfPayload<T> orig, Exchange exchange) throws IOException {
         super(orig.getHeaders(), new ArrayList<Source>(orig.getBodySources()), orig.getNsMap());
         ListIterator<Source> li = getBodySources().listIterator();
         this.xml = orig.xml;
         while (li.hasNext()) {
             Source source = li.next();
             if (source instanceof StreamCache) {
-                li.set((Source) (((StreamCache) source)).copy());
+                li.set((Source) (((StreamCache) source)).copy(exchange));
             }
         }
     }
@@ -149,8 +149,8 @@ public class CachedCxfPayload<T> extends CxfPayload<T> implements StreamCache {
     }
 
     @Override
-    public StreamCache copy() throws IOException {
-        return new CachedCxfPayload<T>(this);
+    public StreamCache copy(Exchange exchange) throws IOException {
+        return new CachedCxfPayload<T>(this, exchange);
     }
 
     private static class DelegatingXMLStreamReader implements XMLStreamReader {

http://git-wip-us.apache.org/repos/asf/camel/blob/f1c5b547/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
index 6c7618b..6eb096b 100644
--- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
@@ -84,7 +84,7 @@ public class CachedCxfPayloadTest extends ExchangeTestSupport {
 
         cache.reset();
 
-        CachedCxfPayload clone = (CachedCxfPayload) cache.copy();
+        CachedCxfPayload clone = (CachedCxfPayload) cache.copy(exchange);
         bos = new ByteArrayOutputStream();
         clone.writeTo(bos);
 


[6/6] camel git commit: CAMEL-8727: File consumer - Add read lock that is based on idempotent repository

Posted by da...@apache.org.
CAMEL-8727: File consumer - Add read lock that is based on idempotent repository


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d1121a7b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d1121a7b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d1121a7b

Branch: refs/heads/master
Commit: d1121a7b4b90599b997f4df88b468cddcef18074
Parents: f1c5b54
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 20:02:17 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 20:02:17 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/component/file/GenericFileEndpoint.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d1121a7b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index ae5381b..ea122cf 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -939,6 +939,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
      * This option applied only for readLock=idempotent.
      * This option allows to specify whether to remove the file name entry from the idempotent repository
      * when processing the file failed and a rollback happens.
+     * If this option is false, then the file name entry is confirmed (as if the file did a commit).
      */
     public void setReadLockRemoveOnRollback(boolean readLockRemoveOnRollback) {
         this.readLockRemoveOnRollback = readLockRemoveOnRollback;


[3/6] camel git commit: Fixed CS

Posted by da...@apache.org.
Fixed CS


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a535a474
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a535a474
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a535a474

Branch: refs/heads/master
Commit: a535a4746c1ec655fb45edd27f7a4be98d7a86fa
Parents: a99f6d5
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 19:28:23 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 19:31:30 2015 +0200

----------------------------------------------------------------------
 .../src/test/java/org/apache/camel/language/simple/Constants.java | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a535a474/camel-core/src/test/java/org/apache/camel/language/simple/Constants.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/language/simple/Constants.java b/camel-core/src/test/java/org/apache/camel/language/simple/Constants.java
index 6edb246..d65a196 100644
--- a/camel-core/src/test/java/org/apache/camel/language/simple/Constants.java
+++ b/camel-core/src/test/java/org/apache/camel/language/simple/Constants.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.language.simple;
 
+// CHECKSTYLE:OFF
+// we want to use the code as-is as that is how end users may code
 public class Constants {
 
     public static String BAR = "456";
@@ -26,3 +28,4 @@ public class Constants {
 
     }
 }
+// CHECKSTYLE:ON


[4/6] camel git commit: CAMEL-8727: File consumer - Add read lock that is based on idempotent repository

Posted by da...@apache.org.
CAMEL-8727: File consumer - Add read lock that is based on idempotent repository


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/504b0d84
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/504b0d84
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/504b0d84

Branch: refs/heads/master
Commit: 504b0d84078944c7e632316d57a58fcb220ca494
Parents: 0fa7d69
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 19:40:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 19:40:31 2015 +0200

----------------------------------------------------------------------
 .../component/file/GenericFileEndpoint.java     | 20 +++++++++++++++
 ...ileIdempotentRepositoryReadLockStrategy.java | 27 ++++++++++++++++++--
 .../strategy/FileProcessStrategyFactory.java    |  4 +++
 3 files changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 9cd7b4f..ae5381b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -173,6 +173,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
     @UriParam(label = "consumer", defaultValue = "true")
     protected boolean readLockRemoveOnRollback = true;
     @UriParam(label = "consumer")
+    protected boolean readLockRemoveOnCommit;
+    @UriParam(label = "consumer")
     protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
     @UriParam(label = "consumer")
     protected ExceptionHandler onCompletionExceptionHandler;
@@ -942,6 +944,23 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
         this.readLockRemoveOnRollback = readLockRemoveOnRollback;
     }
 
+    public boolean isReadLockRemoveOnCommit() {
+        return readLockRemoveOnCommit;
+    }
+
+    /**
+     * This option applied only for readLock=idempotent.
+     * This option allows to specify whether to remove the file name entry from the idempotent repository
+     * when processing the file is succeeded and a commit happens.
+     * <p/>
+     * By default the file is not removed which ensures that any race-condition do not occur so another active
+     * node may attempt to grab the file. Instead the idempotent repository may support eviction strategies
+     * that you can configure to evict the file name entry after X minutes - this ensures no problems with race conditions.
+     */
+    public void setReadLockRemoveOnCommit(boolean readLockRemoveOnCommit) {
+        this.readLockRemoveOnCommit = readLockRemoveOnCommit;
+    }
+
     public int getBufferSize() {
         return bufferSize;
     }
@@ -1256,6 +1275,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
         params.put("readLockLoggingLevel", readLockLoggingLevel);
         params.put("readLockMinAge", readLockMinAge);
         params.put("readLockRemoveOnRollback", readLockRemoveOnRollback);
+        params.put("readLockRemoveOnCommit", readLockRemoveOnCommit);
         return params;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index 763b7e0..b9cf193 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -47,6 +47,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
     private CamelContext camelContext;
     private IdempotentRepository<String> idempotentRepository;
     private boolean removeOnRollback = true;
+    private boolean removeOnCommit;
 
     @Override
     public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception {
@@ -91,8 +92,12 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
     @Override
     public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
         String key = asKey(file);
-        // confirm on commit
-        idempotentRepository.confirm(key);
+        if (removeOnCommit) {
+            idempotentRepository.remove(key);
+        } else {
+            // confirm on commit
+            idempotentRepository.confirm(key);
+        }
     }
 
     public void setTimeout(long timeout) {
@@ -151,6 +156,24 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
         this.removeOnRollback = removeOnRollback;
     }
 
+    /**
+     * Whether to remove the file from the idempotent repository when doing a commit.
+     * <p/>
+     * By default this is false.
+     */
+    public boolean isRemoveOnCommit() {
+        return removeOnCommit;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a commit.
+     * <p/>
+     * By default this is false.
+     */
+    public void setRemoveOnCommit(boolean removeOnCommit) {
+        this.removeOnCommit = removeOnCommit;
+    }
+
     protected String asKey(GenericFile<File> file) {
         // use absolute file path as default key, but evaluate if an expression key was configured
         String key = file.getAbsoluteFilePath();

http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index 5a31374..f9ceca2 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -134,6 +134,10 @@ public final class FileProcessStrategyFactory {
                 if (readLockRemoveOnRollback != null) {
                     readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);
                 }
+                Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
+                if (readLockRemoveOnCommit != null) {
+                    readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
+                }
                 IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository");
                 if (repo != null) {
                     readLockStrategy.setIdempotentRepository(repo);