You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/03/29 11:18:19 UTC

[camel] 02/03: CAMEL-12382: FileConsumer - Allow to delay readLock release tasks on idempotent read-lock

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bcc573257a883c1a292f469e7a31999379196e1d
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Mar 29 11:34:17 2018 +0200

    CAMEL-12382: FileConsumer - Allow to delay readLock release tasks on idempotent read-lock
---
 camel-core/src/main/docs/file-component.adoc       | 10 +--
 .../apache/camel/component/file/FileConsumer.java  |  4 +-
 .../apache/camel/component/file/FileEndpoint.java  |  2 +-
 .../camel/component/file/GenericFileConsumer.java  | 18 +++--
 .../camel/component/file/GenericFileEndpoint.java  | 37 ++++++---
 .../component/file/GenericFileOnCompletion.java    |  6 +-
 ...dempotentChangedRepositoryReadLockStrategy.java |  3 -
 .../FileIdempotentRepositoryReadLockStrategy.java  |  3 -
 .../GenericFileProcessStrategySupport.java         | 35 +++++++-
 .../camel/component/file/NewFileConsumerTest.java  |  2 +-
 .../FileIdempotentReadLockDelayedAsyncTest.java    | 92 ++++++++++++++++++++++
 .../FileIdempotentReadLockDelayedTest.java         | 91 +++++++++++++++++++++
 .../apache/camel/processor/ShutdownDeferTest.java  |  3 +-
 .../camel/processor/ShutdownNotDeferTest.java      |  3 +-
 14 files changed, 271 insertions(+), 38 deletions(-)

diff --git a/camel-core/src/main/docs/file-component.adoc b/camel-core/src/main/docs/file-component.adoc
index 4213496..46ce0e1 100644
--- a/camel-core/src/main/docs/file-component.adoc
+++ b/camel-core/src/main/docs/file-component.adoc
@@ -136,15 +136,15 @@ with the following path and query parameters:
 | *readLock* (lock) | Used by consumer, to only poll the files if it has exclusive read-lock on the file (i.e. the file is not in-progress or being written). Camel will wait until the file lock is granted. This option provides the build in strategies: none - No read lock is in use markerFile - Camel creates a marker file (fileName.camelLock) and then holds a lock on it. This option is not available for the FTP component changed - Changed is using file length/modification timestamp to det [...]
 | *readLockCheckInterval* (lock) | Interval in millis for the read-lock, if supported by the read lock. This interval is used for sleeping between attempts to acquire the read lock. For example when using the changed read lock, you can set a higher interval period to cater for slow writes. The default of 1 sec. may be too fast if the producer is very slow writing the file. Notice: For FTP the default readLockCheckInterval is 5000. The readLockTimeout value must be higher than readLockChe [...]
 | *readLockDeleteOrphanLock Files* (lock) | Whether or not read lock with marker files should upon startup delete any orphan read lock files, which may have been left on the file system, if Camel was not properly shutdown (such as a JVM crash). If turning this option to false then any orphaned lock file will cause Camel to not attempt to pickup that file, this could also be due another node is concurrently reading files from the same shared directory. | true | boolean
-| *readLockIdempotentRelease Async* (lock) | Whether the delayed release task should be synchronous or asynchronous. | false | boolean
-| *readLockIdempotentRelease AsyncPoolSize* (lock) | The number of threads in the scheduled thread pool when using asynchronous release tasks. |  | int
-| *readLockIdempotentRelease Delay* (lock) | Whether to delay the release task for a period of millis. |  | int
-| *readLockIdempotentRelease ExecutorService* (lock) | To use a custom and shared thread pool for asynchronous release tasks. |  | ScheduledExecutor Service
+| *readLockIdempotentRelease Async* (lock) | Whether the delayed release task should be synchronous or asynchronous. See more details at the readLockIdempotentReleaseDelay option. | false | boolean
+| *readLockIdempotentRelease AsyncPoolSize* (lock) | The number of threads in the scheduled thread pool when using asynchronous release tasks. Using a default of 1 core threads should be sufficient in almost all use-cases, only set this to a higher value if either updating the idempotent repository is slow, or there are a lot of files to process. This option is not in-use if you use a shared thread pool by configuring the readLockIdempotentReleaseExecutorService option. See more details  [...]
+| *readLockIdempotentRelease Delay* (lock) | Whether to delay the release task for a period of millis. This can be used to delay the release tasks to expand the window when a file is regarded as read-locked, in an active/active cluster scenario with a shared idempotent repository, to ensure other nodes cannot potentially scan and acquire the same file, due to race-conditions. By expanding the time-window of the release tasks helps prevents these situations. Note delaying is only needed i [...]
+| *readLockIdempotentRelease ExecutorService* (lock) | To use a custom and shared thread pool for asynchronous release tasks. See more details at the readLockIdempotentReleaseDelay option. |  | ScheduledExecutor Service
 | *readLockLoggingLevel* (lock) | Logging level used when a read lock could not be acquired. By default a WARN is logged. You can change this level, for example to OFF to not have any logging. This option is only applicable for readLock of types: changed, fileLock, idempotent, idempotent-changed, idempotent-rename, rename. | DEBUG | LoggingLevel
 | *readLockMarkerFile* (lock) | Whether to use marker file with the changed, rename, or exclusive read lock types. By default a marker file is used as well to guard against other processes picking up the same files. This behavior can be turned off by setting this option to false. For example if you do not want to write marker files to the file systems by the Camel application. | true | boolean
 | *readLockMinAge* (lock) | This option applied only for readLock=change. This option allows to specify a minimum age the file must be before attempting to acquire the read lock. For example use readLockMinAge=300s to require the file is at last 5 minutes old. This can speedup the changed read lock as it will only attempt to acquire files which are at least that given age. | 0 | long
 | *readLockMinLength* (lock) | This option applied only for readLock=changed. This option allows you to configure a minimum file length. By default Camel expects the file to contain data, and thus the default value is 1. You can set this option to zero, to allow consuming zero-length files. | 1 | long
-| *readLockRemoveOnCommit* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file is succeeded and a commit happens. By default the file is not removed which ensures that any race-condition do not occur so another active node may attempt to grab the file. Instead the idempotent repository may support eviction strategies that you can configure to evict the file n [...]
+| *readLockRemoveOnCommit* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file is succeeded and a commit happens. By default the file is not removed which ensures that any race-condition do not occur so another active node may attempt to grab the file. Instead the idempotent repository may support eviction strategies that you can configure to evict the file n [...]
 | *readLockRemoveOnRollback* (lock) | This option applied only for readLock=idempotent. This option allows to specify whether to remove the file name entry from the idempotent repository when processing the file failed and a rollback happens. If this option is false, then the file name entry is confirmed (as if the file did a commit). | true | boolean
 | *readLockTimeout* (lock) | Optional timeout in millis for the read-lock, if supported by the read-lock. If the read-lock could not be granted and the timeout triggered, then Camel will skip the file. At next poll Camel, will try the file again, and this time maybe the read-lock could be granted. Use a value of 0 or lower to indicate forever. Currently fileLock, changed and rename support the timeout. Notice: For FTP the default readLockTimeout value is 20000 instead of 10000. The readL [...]
 | *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. |  | int
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
index e1cc643..860b098 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -42,8 +42,8 @@ public class FileConsumer extends GenericFileConsumer<File> {
     private String endpointPath;
     private Set<String> extendedAttributes;
 
-    public FileConsumer(FileEndpoint endpoint, Processor processor, GenericFileOperations<File> operations) {
-        super(endpoint, processor, operations);
+    public FileConsumer(FileEndpoint endpoint, Processor processor, GenericFileOperations<File> operations, GenericFileProcessStrategy<File> processStrategy) {
+        super(endpoint, processor, operations, processStrategy);
         this.endpointPath = endpoint.getConfiguration().getDirectory();
 
         if (endpoint.getExtendedAttributes() != null) {
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
index 6f22330..bfc75a1 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
@@ -182,7 +182,7 @@ public class FileEndpoint extends GenericFileEndpoint<File> {
      * @return the created consumer
      */
     protected FileConsumer newFileConsumer(Processor processor, GenericFileOperations<File> operations) {
-        return new FileConsumer(this, processor, operations);
+        return new FileConsumer(this, processor, operations, processStrategy != null ? processStrategy : createGenericFileStrategy());
     }
 
     public File getFile() {
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 1cd922f..6129620 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Queue;
 import java.util.regex.Pattern;
 
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
@@ -32,6 +33,7 @@ import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.support.EmptyAsyncCallback;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.StringHelper;
 import org.apache.camel.util.TimeUtils;
@@ -45,6 +47,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
     protected final Logger log = LoggerFactory.getLogger(getClass());
     protected GenericFileEndpoint<T> endpoint;
     protected GenericFileOperations<T> operations;
+    protected GenericFileProcessStrategy<T> processStrategy;
     protected String fileExpressionResult;
     protected volatile ShutdownRunningTask shutdownRunningTask;
     protected volatile int pendingExchanges;
@@ -54,10 +57,11 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
     private final Pattern includePattern;
     private final Pattern excludePattern;
 
-    public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
+    public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations, GenericFileProcessStrategy<T> processStrategy) {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.operations = operations;
+        this.processStrategy = processStrategy;
 
         this.includePattern = endpoint.getIncludePattern();
         this.excludePattern = endpoint.getExcludePattern();
@@ -97,7 +101,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
         // must prepare on startup the very first time
         if (!prepareOnStartup) {
             // prepare on startup
-            endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint);
+            processStrategy.prepareOnStartup(operations, endpoint);
             prepareOnStartup = true;
         }
 
@@ -352,8 +356,6 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
         String absoluteFileName = file.getAbsoluteFilePath();
 
         // check if we can begin processing the file
-        final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
-
         Exception beginCause = null;
         boolean begin = false;
         try {
@@ -438,7 +440,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
 
             // register on completion callback that does the completion strategies
             // (for instance to move the file after we have processed it)
-            exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
+            exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, processStrategy, target, absoluteFileName));
 
             log.debug("About to process file: {} using exchange: {}", target, exchange);
 
@@ -712,6 +714,11 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
 
     @Override
     protected void doStart() throws Exception {
+        // inject CamelContext before starting as it may be needed
+        if (processStrategy instanceof CamelContextAware) {
+            ((CamelContextAware) processStrategy).setCamelContext(getEndpoint().getCamelContext());
+        }
+        ServiceHelper.startService(processStrategy);
         super.doStart();
     }
 
@@ -719,6 +726,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
     protected void doStop() throws Exception {
         prepareOnStartup = false;
         super.doStop();
+        ServiceHelper.stopService(processStrategy);
     }
 
     @Override
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 9cad86a..59b4951 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -23,7 +23,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -228,14 +227,6 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
         return StringHelper.sanitize(message.getMessageId());
     }
 
-    public GenericFileProcessStrategy<T> getGenericFileProcessStrategy() {
-        if (processStrategy == null) {
-            processStrategy = createGenericFileStrategy();
-            log.debug("Using Generic file process strategy: {}", processStrategy);
-        }
-        return processStrategy;
-    }
-
     /**
      * This implementation will <b>not</b> load the file content.
      * Any file locking is neither in use by this implementation..
@@ -990,6 +981,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
      * By default the file is not removed which ensures that any race-condition do not occur so another active
      * node may attempt to grab the file. Instead the idempotent repository may support eviction strategies
      * that you can configure to evict the file name entry after X minutes - this ensures no problems with race conditions.
+     * <p/>
+     * See more details at the readLockIdempotentReleaseDelay option.
      */
     public void setReadLockRemoveOnCommit(boolean readLockRemoveOnCommit) {
         this.readLockRemoveOnCommit = readLockRemoveOnCommit;
@@ -997,6 +990,11 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
 
     /**
      * Whether to delay the release task for a period of millis.
+     * <p/>
+     * This can be used to delay the release tasks to expand the window when a file is regarded as read-locked,
+     * in an active/active cluster scenario with a shared idempotent repository, to ensure other nodes cannot potentially scan and acquire
+     * the same file, due to race-conditions. By expanding the time-window of the release tasks helps prevents these situations.
+     * Note delaying is only needed if you have configured readLockRemoveOnCommit to true.
      */
     public void setReadLockIdempotentReleaseDelay(int readLockIdempotentReleaseDelay) {
         this.readLockIdempotentReleaseDelay = readLockIdempotentReleaseDelay;
@@ -1008,6 +1006,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
 
     /**
      * Whether the delayed release task should be synchronous or asynchronous.
+     * <p/>
+     * See more details at the readLockIdempotentReleaseDelay option.
      */
     public void setReadLockIdempotentReleaseAsync(boolean readLockIdempotentReleaseAsync) {
         this.readLockIdempotentReleaseAsync = readLockIdempotentReleaseAsync;
@@ -1019,6 +1019,11 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
 
     /**
      * The number of threads in the scheduled thread pool when using asynchronous release tasks.
+     * Using a default of 1 core threads should be sufficient in almost all use-cases, only set this to a higher value
+     * if either updating the idempotent repository is slow, or there are a lot of files to process.
+     * This option is not in-use if you use a shared thread pool by configuring the readLockIdempotentReleaseExecutorService option.
+     * <p/>
+     * See more details at the readLockIdempotentReleaseDelay option.
      */
     public void setReadLockIdempotentReleaseAsyncPoolSize(int readLockIdempotentReleaseAsyncPoolSize) {
         this.readLockIdempotentReleaseAsyncPoolSize = readLockIdempotentReleaseAsyncPoolSize;
@@ -1030,6 +1035,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
 
     /**
      * To use a custom and shared thread pool for asynchronous release tasks.
+     * <p/>
+     * See more details at the readLockIdempotentReleaseDelay option.
      */
     public void setReadLockIdempotentReleaseExecutorService(ScheduledExecutorService readLockIdempotentReleaseExecutorService) {
         this.readLockIdempotentReleaseExecutorService = readLockIdempotentReleaseExecutorService;
@@ -1351,10 +1358,16 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
         params.put("readLockMinAge", readLockMinAge);
         params.put("readLockRemoveOnRollback", readLockRemoveOnRollback);
         params.put("readLockRemoveOnCommit", readLockRemoveOnCommit);
-        params.put("readLockIdempotentReleaseDelay", readLockIdempotentReleaseDelay);
+        if (readLockIdempotentReleaseDelay > 0) {
+            params.put("readLockIdempotentReleaseDelay", readLockIdempotentReleaseDelay);
+        }
         params.put("readLockIdempotentReleaseAsync", readLockIdempotentReleaseAsync);
-        params.put("readLockIdempotentReleaseAsyncPoolSize", readLockIdempotentReleaseAsyncPoolSize);
-        params.put("readLockIdempotentReleaseExecutorService", readLockIdempotentReleaseExecutorService);
+        if (readLockIdempotentReleaseAsyncPoolSize > 0) {
+            params.put("readLockIdempotentReleaseAsyncPoolSize", readLockIdempotentReleaseAsyncPoolSize);
+        }
+        if (readLockIdempotentReleaseExecutorService != null) {
+            params.put("readLockIdempotentReleaseExecutorService", readLockIdempotentReleaseExecutorService);
+        }
         return params;
     }
 
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
index caaa15d..5089878 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
@@ -37,14 +37,16 @@ public class GenericFileOnCompletion<T> implements Synchronization {
     private final Logger log = LoggerFactory.getLogger(GenericFileOnCompletion.class);
     private GenericFileEndpoint<T> endpoint;
     private GenericFileOperations<T> operations;
+    private GenericFileProcessStrategy<T> processStrategy;
     private ExceptionHandler exceptionHandler;
     private GenericFile<T> file;
     private String absoluteFileName;
 
-    public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations,
+    public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations, GenericFileProcessStrategy processStrategy,
                                    GenericFile<T> file, String absoluteFileName) {
         this.endpoint = endpoint;
         this.operations = operations;
+        this.processStrategy = processStrategy;
         this.file = file;
         this.absoluteFileName = absoluteFileName;
         this.exceptionHandler = endpoint.getOnCompletionExceptionHandler();
@@ -70,8 +72,6 @@ public class GenericFileOnCompletion<T> implements Synchronization {
     }
 
     protected void onCompletion(Exchange exchange) {
-        GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
-
         log.debug("Done processing file: {} using exchange: {}", file, exchange);
 
         // commit or rollback
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
index 17f22af..0f12f00 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
@@ -310,9 +310,6 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp
             readLockIdempotentReleaseExecutorService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "ReadLockChangedIdempotentReleaseTask", readLockIdempotentReleaseAsyncPoolSize);
             shutdownExecutorService = true;
         }
-
-        // ensure the idempotent repository is added as a service so CamelContext will stop the repo when it shutdown itself
-        camelContext.addService(idempotentRepository, true);
     }
 
     @Override
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index a471eb6..d9a1f06 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -274,9 +274,6 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
             readLockIdempotentReleaseExecutorService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "ReadLockIdempotentReleaseTask", readLockIdempotentReleaseAsyncPoolSize);
             shutdownExecutorService = true;
         }
-
-        // ensure the idempotent repository is added as a service so CamelContext will stop the repo when it shutdown itself
-        camelContext.addService(idempotentRepository, true);
     }
 
     @Override
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
index f38fe67..ce067e0 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.file.strategy;
 import java.io.File;
 import java.io.IOException;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileEndpoint;
@@ -26,16 +28,29 @@ import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
 import org.apache.camel.component.file.GenericFileOperationFailedException;
 import org.apache.camel.component.file.GenericFileOperations;
 import org.apache.camel.component.file.GenericFileProcessStrategy;
+import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Base class for implementations of {@link GenericFileProcessStrategy}.
  */
-public abstract class GenericFileProcessStrategySupport<T> implements GenericFileProcessStrategy<T> {
+public abstract class GenericFileProcessStrategySupport<T> extends ServiceSupport implements GenericFileProcessStrategy<T>, CamelContextAware {
     protected final Logger log = LoggerFactory.getLogger(getClass());
     protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
+    protected CamelContext camelContext;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
 
     public void prepareOnStartup(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint) throws Exception {
         if (exclusiveReadLockStrategy != null) {
@@ -126,5 +141,23 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil
             log.trace("Local work file: {} was deleted: {}", local, deleted);
         }
     }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (exclusiveReadLockStrategy instanceof CamelContextAware) {
+            ((CamelContextAware) exclusiveReadLockStrategy).setCamelContext(camelContext);
+        }
+        ServiceHelper.startService(exclusiveReadLockStrategy);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(exclusiveReadLockStrategy);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownService(exclusiveReadLockStrategy);
+    }
 }
 
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java
index 0deacf8..a7cb495 100644
--- a/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java
@@ -71,7 +71,7 @@ public class NewFileConsumerTest extends ContextTestSupport {
         private volatile boolean post;
 
         protected FileConsumer newFileConsumer(Processor processor, GenericFileOperations<File> operations) {
-            return new FileConsumer(this, processor, operations) {
+            return new FileConsumer(this, processor, operations, createGenericFileStrategy()) {
                 @Override
                 protected void postPollCheck(int polledMessages) {
                     post = true;
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockDelayedAsyncTest.java b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockDelayedAsyncTest.java
new file mode 100644
index 0000000..3c9a4cd
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockDelayedAsyncTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+
+/**
+ * @version
+ */
+public class FileIdempotentReadLockDelayedAsyncTest extends ContextTestSupport {
+
+    MemoryIdempotentRepository myRepo = new MemoryIdempotentRepository();
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/changed/");
+        createDirectory("target/changed/in");
+        super.setUp();
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myRepo", myRepo);
+        return jndi;
+    }
+
+    public void testIdempotentReadLock() throws Exception {
+        assertEquals(0, myRepo.getCacheSize());
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(2).create();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+        // both of them should be done without 2 sequential delays of 1 sec
+        mock.message(0).arrives().between(0, 1400).millis();
+        mock.message(1).arrives().between(0, 1400).millis();
+
+        template.sendBodyAndHeader("file:target/changed/in", "Hello World", Exchange.FILE_NAME, "hello.txt");
+        template.sendBodyAndHeader("file:target/changed/in", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+        assertMockEndpointsSatisfied();
+
+        assertTrue(notify.matches(10, TimeUnit.SECONDS));
+
+        // the files are kept on commit
+        // if you want to remove them then the idempotent repo need some way to evict idle keys
+        assertEquals(2, myRepo.getCacheSize());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/changed/in?initialDelay=0&delay=10&readLock=idempotent&readLockIdempotentReleaseDelay=1000&readLockIdempotentReleaseAsync=true&idempotentRepository=#myRepo")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // we are in progress
+                            int size = myRepo.getCacheSize();
+                            assertTrue(size == 1 || size == 2);
+                        }
+                    })
+                    .to("mock:result");
+            }
+        };
+    }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockDelayedTest.java b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockDelayedTest.java
new file mode 100644
index 0000000..45b8efb
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentReadLockDelayedTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+
+/**
+ * @version
+ */
+public class FileIdempotentReadLockDelayedTest extends ContextTestSupport {
+
+    MemoryIdempotentRepository myRepo = new MemoryIdempotentRepository();
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/changed/");
+        createDirectory("target/changed/in");
+        super.setUp();
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myRepo", myRepo);
+        return jndi;
+    }
+
+    public void testIdempotentReadLock() throws Exception {
+        assertEquals(0, myRepo.getCacheSize());
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(2).create();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+        mock.message(0).arrives().between(0, 1400).millis();
+        mock.message(1).arrives().between(800, 1800).millis();
+
+        template.sendBodyAndHeader("file:target/changed/in", "Hello World", Exchange.FILE_NAME, "hello.txt");
+        template.sendBodyAndHeader("file:target/changed/in", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+        assertMockEndpointsSatisfied();
+
+        assertTrue(notify.matches(10, TimeUnit.SECONDS));
+
+        // the files are kept on commit
+        // if you want to remove them then the idempotent repo need some way to evict idle keys
+        assertEquals(2, myRepo.getCacheSize());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/changed/in?initialDelay=0&delay=10&readLock=idempotent&readLockIdempotentReleaseDelay=1000&idempotentRepository=#myRepo")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // we are in progress
+                            int size = myRepo.getCacheSize();
+                            assertTrue(size == 1 || size == 2);
+                        }
+                    })
+                    .to("mock:result");
+            }
+        };
+    }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java b/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java
index a23fe67..4d0d81a 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java
@@ -26,6 +26,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.FileConsumer;
 import org.apache.camel.component.file.FileEndpoint;
 import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy;
 import org.apache.camel.component.mock.MockEndpoint;
 
 import static org.apache.camel.ShutdownRoute.Defer;
@@ -95,7 +96,7 @@ public class ShutdownDeferTest extends ContextTestSupport {
 
         @Override
         protected FileConsumer newFileConsumer(Processor processor, GenericFileOperations<File> operations) {
-            return new FileConsumer(this, processor, operations) {
+            return new FileConsumer(this, processor, operations, createGenericFileStrategy()) {
                 @Override
                 protected void doSuspend() throws Exception {
                     CONSUMER_SUSPENDED.set(true);
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java b/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java
index 5f62167..2b5e8fe 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java
@@ -26,6 +26,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.FileConsumer;
 import org.apache.camel.component.file.FileEndpoint;
 import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.camel.component.file.strategy.GenericFileNoOpProcessStrategy;
 import org.apache.camel.component.mock.MockEndpoint;
 
 import static org.apache.camel.ShutdownRoute.Default;
@@ -89,7 +90,7 @@ public class ShutdownNotDeferTest extends ContextTestSupport {
 
         @Override
         protected FileConsumer newFileConsumer(Processor processor, GenericFileOperations<File> operations) {
-            return new FileConsumer(this, processor, operations) {
+            return new FileConsumer(this, processor, operations, createGenericFileStrategy()) {
                 @Override
                 protected void doSuspend() throws Exception {
                     CONSUMER_SUSPENDED.set(true);

-- 
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.