You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/05/03 11:04:01 UTC

[07/13] camel git commit: CAMEL-8727: File consumer - Add read lock that is based on idempotent repository

CAMEL-8727: File consumer - Add read lock that is based on idempotent repository


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f24eee8b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f24eee8b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f24eee8b

Branch: refs/heads/master
Commit: f24eee8beb158c0ef2c259e2fb22369f79f53bb9
Parents: 9f86d16
Author: Claus Ibsen <da...@apache.org>
Authored: Sun May 3 09:32:07 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun May 3 10:52:36 2015 +0200

----------------------------------------------------------------------
 .../component/file/GenericFileEndpoint.java     | 33 +++++++++++++++++-
 ...ileIdempotentRepositoryReadLockStrategy.java | 36 ++++++++++++++++++--
 .../strategy/FileProcessStrategyFactory.java    |  4 +++
 3 files changed, 70 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f24eee8b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 0eafd12..48512e9 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -169,6 +169,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
     @UriParam(label = "consumer", defaultValue = "0")
     protected long readLockMinAge;
     @UriParam(label = "consumer", defaultValue = "true")
+    protected boolean readLockRemoveOnCommit = true;
+    @UriParam(label = "consumer", defaultValue = "true")
     protected boolean readLockRemoveOnRollback = true;
     @UriParam(label = "consumer")
     protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
@@ -904,7 +906,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
 
     /**
      * This option applied only for readLock=change.
-     * This options allows to specify a minimum age the file must be before attempting to acquire the read lock.
+     * This option allows to specify a minimum age the file must be before attempting to acquire the read lock.
      * For example use readLockMinAge=300s to require the file is at last 5 minutes old.
      * This can speedup the changed read lock as it will only attempt to acquire files which are at least that given age.
      */
@@ -912,6 +914,34 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
         this.readLockMinAge = readLockMinAge;
     }
 
+    public boolean isReadLockRemoveOnCommit() {
+        return readLockRemoveOnCommit;
+    }
+
+    /**
+     * This option applied only for readLock=idempotent.
+     * This option allows to specify whether to remove the file name entry from the idempotent repository
+     * when the file was processed successfully and is committed. Setting this to <tt>false</tt> allows
+     * to use the read lock as both read lock and idempotent consumer at the same time, as previously
+     * processed file will be kept in the idempotent repository so the same file is not processed again.
+     */
+    public void setReadLockRemoveOnCommit(boolean readLockRemoveOnCommit) {
+        this.readLockRemoveOnCommit = readLockRemoveOnCommit;
+    }
+
+    public boolean isReadLockRemoveOnRollback() {
+        return readLockRemoveOnRollback;
+    }
+
+    /**
+     * This option applied only for readLock=idempotent.
+     * This option allows to specify whether to remove the file name entry from the idempotent repository
+     * when processing the file failed and a rollback happens.
+     */
+    public void setReadLockRemoveOnRollback(boolean readLockRemoveOnRollback) {
+        this.readLockRemoveOnRollback = readLockRemoveOnRollback;
+    }
+
     public int getBufferSize() {
         return bufferSize;
     }
@@ -1225,6 +1255,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
         params.put("readLockMinLength", readLockMinLength);
         params.put("readLockLoggingLevel", readLockLoggingLevel);
         params.put("readLockMinAge", readLockMinAge);
+        params.put("readLockRemoveOnCommit", readLockRemoveOnCommit);
         params.put("readLockRemoveOnRollback", readLockRemoveOnRollback);
         return params;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f24eee8b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index 8e043b8..28f4865 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -42,13 +42,16 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
 
     private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRepositoryReadLockStrategy.class);
 
+    private GenericFileEndpoint<File> endpoint;
     private LoggingLevel loggingLevel = LoggingLevel.TRACE;
     private CamelContext camelContext;
     private IdempotentRepository<String> idempotentRepository;
     private boolean removeOnRollback = true;
+    private boolean removeOnCommit = true;
 
     @Override
     public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception {
+        this.endpoint = endpoint;
         LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint);
     }
 
@@ -80,7 +83,12 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
     public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
         String key = asKey(file);
         CamelLogger.log(LOG, loggingLevel, "releaseExclusiveReadLockOnCommit: " + key);
-        idempotentRepository.contains(key);
+        if (removeOnCommit) {
+            idempotentRepository.remove(key);
+        } else {
+            // if not remove then confirm
+            idempotentRepository.confirm(key);
+        }
     }
 
     public void setTimeout(long timeout) {
@@ -139,8 +147,32 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
         this.removeOnRollback = removeOnRollback;
     }
 
+    /**
+     * Whether to remove the file from the idempotent repository when doing a commit.
+     * <p/>
+     * By default this is commit.
+     */
+    public boolean isRemoveOnCommit() {
+        return removeOnCommit;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a commit.
+     * <p/>
+     * By default this is commit.
+     */
+    public void setRemoveOnCommit(boolean removeOnCommit) {
+        this.removeOnCommit = removeOnCommit;
+    }
+
     protected String asKey(GenericFile<File> file) {
-        return file.getAbsoluteFilePath();
+        // use absolute file path as default key, but evaluate if an expression key was configured
+        String key = file.getAbsoluteFilePath();
+        if (endpoint.getIdempotentKey() != null) {
+            Exchange dummy = endpoint.createExchange(file);
+            key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
+        }
+        return key;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/f24eee8b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index 5a31374..6987905 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -130,6 +130,10 @@ public final class FileProcessStrategyFactory {
                 strategy = readLockStrategy;
             } else if ("idempotent".equals(readLock)) {
                 FileIdempotentRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRepositoryReadLockStrategy();
+                Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
+                if (readLockRemoveOnCommit != null) {
+                    readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
+                }
                 Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback");
                 if (readLockRemoveOnRollback != null) {
                     readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);