You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/05/04 19:59:40 UTC
[1/6] camel git commit: CAMEL-8688: Stream cache now keeps track on
number of copies that was spooled to disk,
so when the temp file is deleted its only deleted when no longer in use. A
file can be shared if using wire tap etc. Thanks to Franz Forsthofer
Repository: camel
Updated Branches:
refs/heads/master cfdf18542 -> d1121a7b4
CAMEL-8688: Stream cache now keeps track on number of copies that was spooled to disk, so when the temp file is deleted its only deleted when no longer in use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0fa7d693
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0fa7d693
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0fa7d693
Branch: refs/heads/master
Commit: 0fa7d6935b890cf8020dffede6145fe8fb7d376f
Parents: a535a47
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 19:31:15 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 19:31:30 2015 +0200
----------------------------------------------------------------------
.../component/netty4/http/NettyChannelBufferStreamCache.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0fa7d693/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
index d8a183f..70635f0 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
@@ -21,6 +21,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import io.netty.buffer.ByteBuf;
+import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
import org.apache.camel.util.IOHelper;
@@ -87,7 +88,7 @@ public final class NettyChannelBufferStreamCache extends InputStream implements
}
@Override
- public StreamCache copy() throws IOException {
+ public StreamCache copy(Exchange exchange) throws IOException {
return new NettyChannelBufferStreamCache(buffer.copy());
}
[2/6] camel git commit: CAMEL-8688: Stream cache now keeps track on
number of copies that was spooled to disk,
so when the temp file is deleted its only deleted when no longer in use. A
file can be shared if using wire tap etc. Thanks to Franz Forsthofer
Posted by da...@apache.org.
CAMEL-8688: Stream cache now keeps track on number of copies that was spooled to disk, so when the temp file is deleted its only deleted when no longer in use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a99f6d57
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a99f6d57
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a99f6d57
Branch: refs/heads/master
Commit: a99f6d5710c3068fa3fe841d4f80dc82deb1142b
Parents: cfdf185
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 19:09:51 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 19:31:30 2015 +0200
----------------------------------------------------------------------
.../main/java/org/apache/camel/StreamCache.java | 6 +-
.../stream/ByteArrayInputStreamCache.java | 3 +-
.../converter/stream/CachedOutputStream.java | 121 ++-----------
.../converter/stream/FileInputStreamCache.java | 173 +++++++++++++++++--
.../converter/stream/InputStreamCache.java | 3 +-
.../camel/converter/stream/ReaderCache.java | 3 +-
.../camel/converter/stream/SourceCache.java | 3 +-
.../converter/stream/StreamSourceCache.java | 6 +-
.../camel/processor/MulticastProcessor.java | 2 +-
.../camel/processor/WireTapProcessor.java | 2 +-
.../processor/WireTapStreamCachingTest.java | 19 +-
.../apache/camel/util/MessageHelperTest.java | 2 +-
.../apache/camel/processor/twoCharacters.txt | 1 +
.../http/NettyChannelBufferStreamCache.java | 3 +-
14 files changed, 207 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/StreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/StreamCache.java b/camel-core/src/main/java/org/apache/camel/StreamCache.java
index ecd9736..29f4284 100644
--- a/camel-core/src/main/java/org/apache/camel/StreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/StreamCache.java
@@ -29,7 +29,7 @@ import java.io.OutputStream;
* The Camel routing engine uses the {@link org.apache.camel.processor.CamelInternalProcessor.StreamCachingAdvice}
* to apply the stream cache during routing.
* <p/>
- * It is recommended in the {@link #copy()} method to let the copied stream start from the start. If the implementation
+ * It is recommended in the {@link #copy(Exchange)} method to let the copied stream start from the start. If the implementation
* does not support copy, then return <tt>null</tt>.
*
* @version
@@ -60,10 +60,12 @@ public interface StreamCache {
* Implementations note: A copy of the stream is recommended to read from the start
* of the stream.
*
+ * @param exchange exchange in which the stream cache object is used;
+ * can be used to delete resources of the stream cache when the exchange is completed
* @return a copy, or <tt>null</tt> if copy is not possible
* @throws java.io.IOException is thrown if the copy fails
*/
- StreamCache copy() throws IOException;
+ StreamCache copy(Exchange exchange) throws IOException;
/**
* Whether this {@link StreamCache} is in memory only or
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
index 3b1dacf..9375ee3 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ByteArrayInputStreamCache.java
@@ -22,6 +22,7 @@ import java.io.FilterInputStream;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
import org.apache.camel.util.IOHelper;
@@ -51,7 +52,7 @@ public class ByteArrayInputStreamCache extends FilterInputStream implements Stre
IOHelper.copyAndCloseInput(in, os);
}
- public StreamCache copy() throws IOException {
+ public StreamCache copy(Exchange exchange) throws IOException {
if (byteArrayForCopy == null) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(in.available());
IOHelper.copy(in, baos);
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index 639e339..d722baf 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -16,29 +16,15 @@
*/
package org.apache.camel.converter.stream;
-import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.security.GeneralSecurityException;
-
-import javax.crypto.CipherOutputStream;
import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
-import org.apache.camel.converter.stream.FileInputStreamCache.FileInputStreamCloser;
+import org.apache.camel.converter.stream.FileInputStreamCache.TempFileManager;
import org.apache.camel.spi.StreamCachingStrategy;
-import org.apache.camel.spi.Synchronization;
-import org.apache.camel.spi.UnitOfWork;
-import org.apache.camel.support.SynchronizationAdapter;
-import org.apache.camel.util.FileUtil;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This output stream will store the content into a File if the stream context size is exceed the
@@ -50,7 +36,7 @@ import org.slf4j.LoggerFactory;
* <p/>
* You can get a cached input stream of this stream. The temp file which is created with this
* output stream will be deleted when you close this output stream or the cached
- * fileInputStream(s) is/are closed after the exchange is completed.
+ * fileInputStream(s) is/are closed after all the exchanges using the temp file are completed.
*/
public class CachedOutputStream extends OutputStream {
@Deprecated
@@ -61,16 +47,12 @@ public class CachedOutputStream extends OutputStream {
public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
@Deprecated
public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation";
- private static final Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class);
private final StreamCachingStrategy strategy;
private OutputStream currentStream;
private boolean inMemory = true;
private int totalLength;
- private File tempFile;
- private FileInputStreamCache fileInputStreamCache;
- private final FileInputStreamCloser fileInputStreamCloser = new FileInputStreamCloser();
- private CipherPair ciphers;
+ private final TempFileManager tempFileManager;
private final boolean closedOnCompletion;
public CachedOutputStream(Exchange exchange) {
@@ -79,44 +61,10 @@ public class CachedOutputStream extends OutputStream {
public CachedOutputStream(Exchange exchange, final boolean closedOnCompletion) {
this.closedOnCompletion = closedOnCompletion;
+ tempFileManager = new TempFileManager(closedOnCompletion);
+ tempFileManager.addExchange(exchange);
this.strategy = exchange.getContext().getStreamCachingStrategy();
currentStream = new CachedByteArrayOutputStream(strategy.getBufferSize());
- if (closedOnCompletion) {
- // add on completion so we can cleanup after the exchange is done such as deleting temporary files
- Synchronization onCompletion = new SynchronizationAdapter() {
- @Override
- public void onDone(Exchange exchange) {
- try {
- closeFileInputStreams();
- close();
- try {
- cleanUpTempFile();
- } catch (Exception e) {
- LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e);
- }
- } catch (Exception e) {
- LOG.warn("Error closing streams. This exception will be ignored.", e);
- }
- }
-
- @Override
- public String toString() {
- return "OnCompletion[CachedOutputStream]";
- }
- };
-
- UnitOfWork streamCacheUnitOfWork = exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class);
- if (streamCacheUnitOfWork != null) {
- // The stream cache must sometimes not be closed when the exchange is deleted. This is for example the
- // case in the splitter and multi-cast case with AggregationStrategy where the result of the sub-routes
- // are aggregated later in the main route. Here, the cached streams of the sub-routes must be closed with
- // the Unit of Work of the main route.
- streamCacheUnitOfWork.addSynchronization(onCompletion);
- } else {
- // add on completion so we can cleanup after the exchange is done such as deleting temporary files
- exchange.addOnCompletion(onCompletion);
- }
- }
}
public void flush() throws IOException {
@@ -127,12 +75,8 @@ public class CachedOutputStream extends OutputStream {
currentStream.close();
// need to clean up the temp file this time
if (!closedOnCompletion) {
- closeFileInputStreams();
- try {
- cleanUpTempFile();
- } catch (Exception e) {
- LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e);
- }
+ tempFileManager.closeFileInputStreams();
+ tempFileManager.cleanUpTempFile();
}
}
@@ -206,40 +150,17 @@ public class CachedOutputStream extends OutputStream {
throw new IllegalStateException("CurrentStream should be an instance of CachedByteArrayOutputStream but is: " + currentStream.getClass().getName());
}
} else {
- try {
- if (fileInputStreamCache == null) {
- fileInputStreamCache = new FileInputStreamCache(tempFile, ciphers, fileInputStreamCloser);
- }
- return fileInputStreamCache;
- } catch (FileNotFoundException e) {
- throw new IOException("Cached file " + tempFile + " not found", e);
- }
+ return tempFileManager.newStreamCache();
}
}
- private void closeFileInputStreams() {
- fileInputStreamCloser.close();
- fileInputStreamCache = null;
- }
-
- private void cleanUpTempFile() {
- // cleanup temporary file
- if (tempFile != null) {
- FileUtil.deleteFile(tempFile);
- tempFile = null;
- }
- }
private void pageToFileStream() throws IOException {
flush();
-
ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
- tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getSpoolDirectory());
-
- LOG.trace("Creating temporary stream cache file: {}", tempFile);
-
try {
- currentStream = createOutputStream(tempFile);
+ // creates an tmp file and a file output stream
+ currentStream = tempFileManager.createOutputStream(strategy);
bout.writeTo(currentStream);
} finally {
// ensure flag is flipped to file based
@@ -291,26 +212,4 @@ public class CachedOutputStream extends OutputStream {
}
}
- private OutputStream createOutputStream(File file) throws IOException {
- OutputStream out = new BufferedOutputStream(new FileOutputStream(file));
- if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) {
- try {
- if (ciphers == null) {
- ciphers = new CipherPair(strategy.getSpoolChiper());
- }
- } catch (GeneralSecurityException e) {
- throw new IOException(e.getMessage(), e);
- }
- out = new CipherOutputStream(out, ciphers.getEncryptor()) {
- boolean closed;
- public void close() throws IOException {
- if (!closed) {
- super.close();
- closed = true;
- }
- }
- };
- }
- return out;
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
index c6e99ac..a0d6501 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
@@ -17,44 +17,60 @@
package org.apache.camel.converter.stream;
import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
+import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StreamCache;
+import org.apache.camel.spi.StreamCachingStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.FileUtil;
import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link StreamCache} for {@link File}s
*/
public final class FileInputStreamCache extends InputStream implements StreamCache {
private InputStream stream;
+ private final long length;
+ private final FileInputStreamCache.TempFileManager tempFileManager;
private final File file;
private final CipherPair ciphers;
- private final long length;
- private final FileInputStreamCache.FileInputStreamCloser closer;
+ /** Only for testing purposes.*/
public FileInputStreamCache(File file) throws FileNotFoundException {
- this(file, null, new FileInputStreamCloser());
+ this(new TempFileManager(file, true));
}
- FileInputStreamCache(File file, CipherPair ciphers, FileInputStreamCloser closer) throws FileNotFoundException {
- this.file = file;
+ FileInputStreamCache(TempFileManager closer) throws FileNotFoundException {
+ this.file = closer.getTempFile();
this.stream = null;
- this.ciphers = ciphers;
+ this.ciphers = closer.getCiphers();
this.length = file.length();
- this.closer = closer;
- this.closer.add(this);
+ this.tempFileManager = closer;
+ this.tempFileManager.add(this);
}
@Override
@@ -99,8 +115,9 @@ public final class FileInputStreamCache extends InputStream implements StreamCac
}
}
- public StreamCache copy() throws IOException {
- FileInputStreamCache copy = new FileInputStreamCache(file, ciphers, closer);
+ public StreamCache copy(Exchange exchange) throws IOException {
+ tempFileManager.addExchange(exchange);
+ FileInputStreamCache copy = new FileInputStreamCache(tempFileManager);
return copy;
}
@@ -146,16 +163,37 @@ public final class FileInputStreamCache extends InputStream implements StreamCac
}
/**
- * Collects all FileInputStreamCache instances of a temporary file which must be closed
- * at the end of the route.
+ * Manages the temporary file for the file input stream caches.
+ *
+ * Collects all FileInputStreamCache instances of the temporary file.
+ * Counts the number of exchanges which have a FileInputStreamCache instance of the temporary file.
+ * Deletes the temporary file, if all exchanges are done.
*
* @see CachedOutputStream
*/
- static class FileInputStreamCloser {
+ static class TempFileManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TempFileManager.class);
+ /** Indicator whether the file input stream caches are closed on completion of the exchanges. */
+ private final boolean closedOnCompletion;
+ private AtomicInteger exchangeCounter = new AtomicInteger();
+ private File tempFile;
+ private OutputStream outputStream; // file output stream
+ private CipherPair ciphers;
- // there can be several input streams, for example in the multi-cast parallel processing
+ // there can be several input streams, for example in the multi-cast, or wiretap parallel processing
private List<FileInputStreamCache> fileInputStreamCaches;
+
+ /** Only for testing.*/
+ private TempFileManager(File file, boolean closedOnCompletion) {
+ this(closedOnCompletion);
+ this.tempFile = file;
+ }
+ TempFileManager(boolean closedOnCompletion) {
+ this.closedOnCompletion = closedOnCompletion;
+ }
+
/** Adds a FileInputStreamCache instance to the closer.
* <p>
* Must be synchronized, because can be accessed by several threads.
@@ -167,14 +205,119 @@ public final class FileInputStreamCache extends InputStream implements StreamCac
fileInputStreamCaches.add(fileInputStreamCache);
}
- void close() {
+ void addExchange(Exchange exchange) {
+ if (closedOnCompletion) {
+ exchangeCounter.incrementAndGet();
+ // add on completion so we can cleanup after the exchange is done such as deleting temporary files
+ Synchronization onCompletion = new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ int actualExchanges = exchangeCounter.decrementAndGet();
+ if (actualExchanges == 0) {
+ // only one exchange (one thread) left, therefore we must not synchronize the following lines of code
+ try {
+ closeFileInputStreams();
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ try {
+ cleanUpTempFile();
+ } catch (Exception e) {
+ LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error closing streams. This exception will be ignored.", e);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "OnCompletion[CachedOutputStream]";
+ }
+ };
+ UnitOfWork streamCacheUnitOfWork = exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class);
+ if (streamCacheUnitOfWork != null) {
+ // The stream cache must sometimes not be closed when the exchange is deleted. This is for example the
+ // case in the splitter and multi-cast case with AggregationStrategy where the result of the sub-routes
+ // are aggregated later in the main route. Here, the cached streams of the sub-routes must be closed with
+ // the Unit of Work of the main route.
+ streamCacheUnitOfWork.addSynchronization(onCompletion);
+ } else {
+ // add on completion so we can cleanup after the exchange is done such as deleting temporary files
+ exchange.addOnCompletion(onCompletion);
+ }
+ }
+ }
+
+ OutputStream createOutputStream(StreamCachingStrategy strategy) throws IOException {
+ // should only be called once
+ if (tempFile != null) {
+ throw new IllegalStateException("The method 'createOutputStream' can only be called once!");
+ }
+ tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getSpoolDirectory());
+
+ LOG.trace("Creating temporary stream cache file: {}", tempFile);
+ OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFile));
+ if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) {
+ try {
+ if (ciphers == null) {
+ ciphers = new CipherPair(strategy.getSpoolChiper());
+ }
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ out = new CipherOutputStream(out, ciphers.getEncryptor()) {
+ boolean closed;
+ public void close() throws IOException {
+ if (!closed) {
+ super.close();
+ closed = true;
+ }
+ }
+ };
+ }
+ outputStream = out;
+ return out;
+ }
+
+ FileInputStreamCache newStreamCache() throws IOException {
+ try {
+ return new FileInputStreamCache(this);
+ } catch (FileNotFoundException e) {
+ throw new IOException("Cached file " + tempFile + " not found", e);
+ }
+ }
+
+ void closeFileInputStreams() {
if (fileInputStreamCaches != null) {
for (FileInputStreamCache fileInputStreamCache : fileInputStreamCaches) {
fileInputStreamCache.close();
}
fileInputStreamCaches.clear();
}
+ }
+
+ void cleanUpTempFile() {
+ // cleanup temporary file
+ try {
+ if (tempFile != null) {
+ FileUtil.deleteFile(tempFile);
+ tempFile = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e);
+ }
+ }
+
+ File getTempFile() {
+ return tempFile;
+ }
+
+ CipherPair getCiphers() {
+ return ciphers;
}
+
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
index ba7f12e..78422a7 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
/**
@@ -40,7 +41,7 @@ public final class InputStreamCache extends ByteArrayInputStream implements Stre
os.write(buf, pos, count - pos);
}
- public StreamCache copy() {
+ public StreamCache copy(Exchange exchange) {
return new InputStreamCache(buf, count);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
index bed761c..2890945 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.StringReader;
+import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
/**
@@ -51,7 +52,7 @@ public class ReaderCache extends StringReader implements StreamCache {
os.write(data.getBytes());
}
- public StreamCache copy() throws IOException {
+ public StreamCache copy(Exchange exchange) throws IOException {
return new ReaderCache(data);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
index 16f8422..4f00eb4 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
@@ -19,6 +19,7 @@ package org.apache.camel.converter.stream;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
import org.apache.camel.StringSource;
import org.apache.camel.util.IOHelper;
@@ -44,7 +45,7 @@ public final class SourceCache extends StringSource implements StreamCache {
IOHelper.copy(getInputStream(), os);
}
- public StreamCache copy() throws IOException {
+ public StreamCache copy(Exchange exchange) throws IOException {
return new SourceCache(getText());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
index 499f799..a7edfc9 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
@@ -83,12 +83,12 @@ public final class StreamSourceCache extends StreamSource implements StreamCache
}
}
- public StreamCache copy() throws IOException {
+ public StreamCache copy(Exchange exchange) throws IOException {
if (streamCache != null) {
- return new StreamSourceCache(streamCache.copy());
+ return new StreamSourceCache(streamCache.copy(exchange));
}
if (readCache != null) {
- return new StreamSourceCache(readCache.copy());
+ return new StreamSourceCache(readCache.copy(exchange));
}
return null;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 334ceb1..e4a2ef8 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -953,7 +953,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
if (index > 0) {
// copy it otherwise parallel processing is not possible,
// because streams can only be read once
- StreamCache copiedStreamCache = streamCache.copy();
+ StreamCache copiedStreamCache = streamCache.copy(copy);
if (copiedStreamCache != null) {
copy.getIn().setBody(copiedStreamCache);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index a74e663..1d6b835 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -180,7 +180,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
if (msg.getBody() instanceof StreamCache) {
// in parallel processing case, the stream must be copied, therefore get the stream
StreamCache cache = (StreamCache) msg.getBody();
- StreamCache copied = cache.copy();
+ StreamCache copied = cache.copy(answer);
if (copied != null) {
msg.setBody(copied);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
index 0a87c13..1db7307 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor;
import java.io.StringReader;
+
import javax.xml.transform.stream.StreamSource;
import org.apache.camel.ContextTestSupport;
@@ -26,6 +27,7 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
/**
* @version
@@ -51,7 +53,19 @@ public class WireTapStreamCachingTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
}
+
+ @Test
+ public void testSendingAMessageUsingWiretapShouldNotDeleteStreamFileBeforeAllExcangesAreComplete() throws InterruptedException {
+
+ x.expectedMessageCount(1);
+ y.expectedMessageCount(1);
+ z.expectedMessageCount(1);
+
+ // the used file should contain more than one character in order to be streamed into the file system
+ template.sendBody("direct:a", this.getClass().getClassLoader().getResourceAsStream("org/apache/camel/processor/twoCharacters.txt"));
+ assertMockEndpointsSatisfied();
+ }
@Override
protected void setUp() throws Exception {
@@ -76,6 +90,8 @@ public class WireTapStreamCachingTest extends ContextTestSupport {
public void configure() {
// enable stream caching
context.setStreamCaching(true);
+ // set stream threshold to 1, in order to stream into the file system
+ context.getStreamCachingStrategy().setSpoolThreshold(1);
errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(3));
@@ -83,7 +99,8 @@ public class WireTapStreamCachingTest extends ContextTestSupport {
from("direct:a").wireTap("direct:x").wireTap("direct:y").wireTap("direct:z");
from("direct:x").process(processor).to("mock:x");
- from("direct:y").process(processor).to("mock:y");
+ // even if a process takes more time then the others the wire tap shall work
+ from("direct:y").delay(2000).process(processor).to("mock:y");
from("direct:z").process(processor).to("mock:z");
}
};
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
index 7448989..cf80614 100644
--- a/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
+++ b/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
@@ -63,7 +63,7 @@ public class MessageHelperTest extends TestCase {
// noop
}
- public StreamCache copy() throws IOException {
+ public StreamCache copy(Exchange exchange) throws IOException {
return null;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt
----------------------------------------------------------------------
diff --git a/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt b/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt
new file mode 100644
index 0000000..dfc9179
--- /dev/null
+++ b/camel-core/src/test/resources/org/apache/camel/processor/twoCharacters.txt
@@ -0,0 +1 @@
+AB
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/a99f6d57/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
index 8ecb8f7..b3afc4a 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyChannelBufferStreamCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
import org.apache.camel.util.IOHelper;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -87,7 +88,7 @@ public final class NettyChannelBufferStreamCache extends InputStream implements
}
@Override
- public StreamCache copy() throws IOException {
+ public StreamCache copy(Exchange exchange) throws IOException {
return new NettyChannelBufferStreamCache(buffer.copy());
}
[5/6] camel git commit: CAMEL-8688: Stream cache now keeps track on
number of copies that was spooled to disk,
so when the temp file is deleted its only deleted when no longer in use. A
file can be shared if using wire tap etc. Thanks to Franz Forsthofer
Posted by da...@apache.org.
CAMEL-8688: Stream cache now keeps track on number of copies that was spooled to disk, so when the temp file is deleted its only deleted when no longer in use. A file can be shared if using wire tap etc. Thanks to Franz Forsthofer for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f1c5b547
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f1c5b547
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f1c5b547
Branch: refs/heads/master
Commit: f1c5b5472deca2f435e669c814655dd26e3512f6
Parents: 504b0d8
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 20:02:08 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 20:02:08 2015 +0200
----------------------------------------------------------------------
.../camel/component/cxf/converter/CachedCxfPayload.java | 8 ++++----
.../camel/component/cxf/converter/CachedCxfPayloadTest.java | 2 +-
2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f1c5b547/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
index d54a026..7cc94a6 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
@@ -92,14 +92,14 @@ public class CachedCxfPayload<T> extends CxfPayload<T> implements StreamCache {
}
}
- private CachedCxfPayload(CachedCxfPayload<T> orig) throws IOException {
+ private CachedCxfPayload(CachedCxfPayload<T> orig, Exchange exchange) throws IOException {
super(orig.getHeaders(), new ArrayList<Source>(orig.getBodySources()), orig.getNsMap());
ListIterator<Source> li = getBodySources().listIterator();
this.xml = orig.xml;
while (li.hasNext()) {
Source source = li.next();
if (source instanceof StreamCache) {
- li.set((Source) (((StreamCache) source)).copy());
+ li.set((Source) (((StreamCache) source)).copy(exchange));
}
}
}
@@ -149,8 +149,8 @@ public class CachedCxfPayload<T> extends CxfPayload<T> implements StreamCache {
}
@Override
- public StreamCache copy() throws IOException {
- return new CachedCxfPayload<T>(this);
+ public StreamCache copy(Exchange exchange) throws IOException {
+ return new CachedCxfPayload<T>(this, exchange);
}
private static class DelegatingXMLStreamReader implements XMLStreamReader {
http://git-wip-us.apache.org/repos/asf/camel/blob/f1c5b547/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
index 6c7618b..6eb096b 100644
--- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
@@ -84,7 +84,7 @@ public class CachedCxfPayloadTest extends ExchangeTestSupport {
cache.reset();
- CachedCxfPayload clone = (CachedCxfPayload) cache.copy();
+ CachedCxfPayload clone = (CachedCxfPayload) cache.copy(exchange);
bos = new ByteArrayOutputStream();
clone.writeTo(bos);
[6/6] camel git commit: CAMEL-8727: File consumer - Add read lock
that is based on idempotent repository
Posted by da...@apache.org.
CAMEL-8727: File consumer - Add read lock that is based on idempotent repository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d1121a7b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d1121a7b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d1121a7b
Branch: refs/heads/master
Commit: d1121a7b4b90599b997f4df88b468cddcef18074
Parents: f1c5b54
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 20:02:17 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 20:02:17 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/camel/component/file/GenericFileEndpoint.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d1121a7b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index ae5381b..ea122cf 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -939,6 +939,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
* This option applied only for readLock=idempotent.
* This option allows to specify whether to remove the file name entry from the idempotent repository
* when processing the file failed and a rollback happens.
+ * If this option is false, then the file name entry is confirmed (as if the file did a commit).
*/
public void setReadLockRemoveOnRollback(boolean readLockRemoveOnRollback) {
this.readLockRemoveOnRollback = readLockRemoveOnRollback;
[3/6] camel git commit: Fixed CS
Posted by da...@apache.org.
Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a535a474
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a535a474
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a535a474
Branch: refs/heads/master
Commit: a535a4746c1ec655fb45edd27f7a4be98d7a86fa
Parents: a99f6d5
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 19:28:23 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 19:31:30 2015 +0200
----------------------------------------------------------------------
.../src/test/java/org/apache/camel/language/simple/Constants.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a535a474/camel-core/src/test/java/org/apache/camel/language/simple/Constants.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/language/simple/Constants.java b/camel-core/src/test/java/org/apache/camel/language/simple/Constants.java
index 6edb246..d65a196 100644
--- a/camel-core/src/test/java/org/apache/camel/language/simple/Constants.java
+++ b/camel-core/src/test/java/org/apache/camel/language/simple/Constants.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.language.simple;
+// CHECKSTYLE:OFF
+// we want to use the code as-is as that is how end users may code
public class Constants {
public static String BAR = "456";
@@ -26,3 +28,4 @@ public class Constants {
}
}
+// CHECKSTYLE:ON
[4/6] camel git commit: CAMEL-8727: File consumer - Add read lock
that is based on idempotent repository
Posted by da...@apache.org.
CAMEL-8727: File consumer - Add read lock that is based on idempotent repository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/504b0d84
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/504b0d84
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/504b0d84
Branch: refs/heads/master
Commit: 504b0d84078944c7e632316d57a58fcb220ca494
Parents: 0fa7d69
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 19:40:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 19:40:31 2015 +0200
----------------------------------------------------------------------
.../component/file/GenericFileEndpoint.java | 20 +++++++++++++++
...ileIdempotentRepositoryReadLockStrategy.java | 27 ++++++++++++++++++--
.../strategy/FileProcessStrategyFactory.java | 4 +++
3 files changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 9cd7b4f..ae5381b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -173,6 +173,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
@UriParam(label = "consumer", defaultValue = "true")
protected boolean readLockRemoveOnRollback = true;
@UriParam(label = "consumer")
+ protected boolean readLockRemoveOnCommit;
+ @UriParam(label = "consumer")
protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
@UriParam(label = "consumer")
protected ExceptionHandler onCompletionExceptionHandler;
@@ -942,6 +944,23 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
this.readLockRemoveOnRollback = readLockRemoveOnRollback;
}
+ public boolean isReadLockRemoveOnCommit() {
+ return readLockRemoveOnCommit;
+ }
+
+ /**
+ * This option applied only for readLock=idempotent.
+ * This option allows to specify whether to remove the file name entry from the idempotent repository
+ * when processing the file is succeeded and a commit happens.
+ * <p/>
+ * By default the file is not removed which ensures that any race-condition do not occur so another active
+ * node may attempt to grab the file. Instead the idempotent repository may support eviction strategies
+ * that you can configure to evict the file name entry after X minutes - this ensures no problems with race conditions.
+ */
+ public void setReadLockRemoveOnCommit(boolean readLockRemoveOnCommit) {
+ this.readLockRemoveOnCommit = readLockRemoveOnCommit;
+ }
+
public int getBufferSize() {
return bufferSize;
}
@@ -1256,6 +1275,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
params.put("readLockLoggingLevel", readLockLoggingLevel);
params.put("readLockMinAge", readLockMinAge);
params.put("readLockRemoveOnRollback", readLockRemoveOnRollback);
+ params.put("readLockRemoveOnCommit", readLockRemoveOnCommit);
return params;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index 763b7e0..b9cf193 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -47,6 +47,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
private CamelContext camelContext;
private IdempotentRepository<String> idempotentRepository;
private boolean removeOnRollback = true;
+ private boolean removeOnCommit;
@Override
public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception {
@@ -91,8 +92,12 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
@Override
public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
String key = asKey(file);
- // confirm on commit
- idempotentRepository.confirm(key);
+ if (removeOnCommit) {
+ idempotentRepository.remove(key);
+ } else {
+ // confirm on commit
+ idempotentRepository.confirm(key);
+ }
}
public void setTimeout(long timeout) {
@@ -151,6 +156,24 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
this.removeOnRollback = removeOnRollback;
}
+ /**
+ * Whether to remove the file from the idempotent repository when doing a commit.
+ * <p/>
+ * By default this is false.
+ */
+ public boolean isRemoveOnCommit() {
+ return removeOnCommit;
+ }
+
+ /**
+ * Whether to remove the file from the idempotent repository when doing a commit.
+ * <p/>
+ * By default this is false.
+ */
+ public void setRemoveOnCommit(boolean removeOnCommit) {
+ this.removeOnCommit = removeOnCommit;
+ }
+
protected String asKey(GenericFile<File> file) {
// use absolute file path as default key, but evaluate if an expression key was configured
String key = file.getAbsoluteFilePath();
http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index 5a31374..f9ceca2 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -134,6 +134,10 @@ public final class FileProcessStrategyFactory {
if (readLockRemoveOnRollback != null) {
readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);
}
+ Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
+ if (readLockRemoveOnCommit != null) {
+ readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
+ }
IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository");
if (repo != null) {
readLockStrategy.setIdempotentRepository(repo);