You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/05/04 19:59:43 UTC

[4/6] camel git commit: CAMEL-8727: File consumer - Add read lock that is based on idempotent repository

CAMEL-8727: File consumer - Add read lock that is based on idempotent repository


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/504b0d84
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/504b0d84
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/504b0d84

Branch: refs/heads/master
Commit: 504b0d84078944c7e632316d57a58fcb220ca494
Parents: 0fa7d69
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 19:40:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 19:40:31 2015 +0200

----------------------------------------------------------------------
 .../component/file/GenericFileEndpoint.java     | 20 +++++++++++++++
 ...ileIdempotentRepositoryReadLockStrategy.java | 27 ++++++++++++++++++--
 .../strategy/FileProcessStrategyFactory.java    |  4 +++
 3 files changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 9cd7b4f..ae5381b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -173,6 +173,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
     @UriParam(label = "consumer", defaultValue = "true")
     protected boolean readLockRemoveOnRollback = true;
     @UriParam(label = "consumer")
+    protected boolean readLockRemoveOnCommit;
+    @UriParam(label = "consumer")
     protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
     @UriParam(label = "consumer")
     protected ExceptionHandler onCompletionExceptionHandler;
@@ -942,6 +944,23 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
         this.readLockRemoveOnRollback = readLockRemoveOnRollback;
     }
 
+    public boolean isReadLockRemoveOnCommit() {
+        return readLockRemoveOnCommit;
+    }
+
+    /**
+     * This option applied only for readLock=idempotent.
+     * This option allows to specify whether to remove the file name entry from the idempotent repository
+     * when processing the file is succeeded and a commit happens.
+     * <p/>
+     * By default the file is not removed which ensures that any race-condition do not occur so another active
+     * node may attempt to grab the file. Instead the idempotent repository may support eviction strategies
+     * that you can configure to evict the file name entry after X minutes - this ensures no problems with race conditions.
+     */
+    public void setReadLockRemoveOnCommit(boolean readLockRemoveOnCommit) {
+        this.readLockRemoveOnCommit = readLockRemoveOnCommit;
+    }
+
     public int getBufferSize() {
         return bufferSize;
     }
@@ -1256,6 +1275,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
         params.put("readLockLoggingLevel", readLockLoggingLevel);
         params.put("readLockMinAge", readLockMinAge);
         params.put("readLockRemoveOnRollback", readLockRemoveOnRollback);
+        params.put("readLockRemoveOnCommit", readLockRemoveOnCommit);
         return params;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index 763b7e0..b9cf193 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -47,6 +47,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
     private CamelContext camelContext;
     private IdempotentRepository<String> idempotentRepository;
     private boolean removeOnRollback = true;
+    private boolean removeOnCommit;
 
     @Override
     public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception {
@@ -91,8 +92,12 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
     @Override
     public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
         String key = asKey(file);
-        // confirm on commit
-        idempotentRepository.confirm(key);
+        if (removeOnCommit) {
+            idempotentRepository.remove(key);
+        } else {
+            // confirm on commit
+            idempotentRepository.confirm(key);
+        }
     }
 
     public void setTimeout(long timeout) {
@@ -151,6 +156,24 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
         this.removeOnRollback = removeOnRollback;
     }
 
+    /**
+     * Whether to remove the file from the idempotent repository when doing a commit.
+     * <p/>
+     * By default this is false.
+     */
+    public boolean isRemoveOnCommit() {
+        return removeOnCommit;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a commit.
+     * <p/>
+     * By default this is false.
+     */
+    public void setRemoveOnCommit(boolean removeOnCommit) {
+        this.removeOnCommit = removeOnCommit;
+    }
+
     protected String asKey(GenericFile<File> file) {
         // use absolute file path as default key, but evaluate if an expression key was configured
         String key = file.getAbsoluteFilePath();

http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index 5a31374..f9ceca2 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -134,6 +134,10 @@ public final class FileProcessStrategyFactory {
                 if (readLockRemoveOnRollback != null) {
                     readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);
                 }
+                Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
+                if (readLockRemoveOnCommit != null) {
+                    readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
+                }
                 IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository");
                 if (repo != null) {
                     readLockStrategy.setIdempotentRepository(repo);