You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/05/04 19:59:43 UTC
[4/6] camel git commit: CAMEL-8727: File consumer - Add read lock
that is based on idempotent repository
CAMEL-8727: File consumer - Add read lock that is based on idempotent repository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/504b0d84
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/504b0d84
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/504b0d84
Branch: refs/heads/master
Commit: 504b0d84078944c7e632316d57a58fcb220ca494
Parents: 0fa7d69
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 4 19:40:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 4 19:40:31 2015 +0200
----------------------------------------------------------------------
.../component/file/GenericFileEndpoint.java | 20 +++++++++++++++
...ileIdempotentRepositoryReadLockStrategy.java | 27 ++++++++++++++++++--
.../strategy/FileProcessStrategyFactory.java | 4 +++
3 files changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 9cd7b4f..ae5381b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -173,6 +173,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
@UriParam(label = "consumer", defaultValue = "true")
protected boolean readLockRemoveOnRollback = true;
@UriParam(label = "consumer")
+ protected boolean readLockRemoveOnCommit;
+ @UriParam(label = "consumer")
protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
@UriParam(label = "consumer")
protected ExceptionHandler onCompletionExceptionHandler;
@@ -942,6 +944,23 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
this.readLockRemoveOnRollback = readLockRemoveOnRollback;
}
+ public boolean isReadLockRemoveOnCommit() {
+ return readLockRemoveOnCommit;
+ }
+
+ /**
+ * This option applied only for readLock=idempotent.
+ * This option allows to specify whether to remove the file name entry from the idempotent repository
+ * when processing the file is succeeded and a commit happens.
+ * <p/>
+ * By default the file is not removed which ensures that any race-condition do not occur so another active
+ * node may attempt to grab the file. Instead the idempotent repository may support eviction strategies
+ * that you can configure to evict the file name entry after X minutes - this ensures no problems with race conditions.
+ */
+ public void setReadLockRemoveOnCommit(boolean readLockRemoveOnCommit) {
+ this.readLockRemoveOnCommit = readLockRemoveOnCommit;
+ }
+
public int getBufferSize() {
return bufferSize;
}
@@ -1256,6 +1275,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
params.put("readLockLoggingLevel", readLockLoggingLevel);
params.put("readLockMinAge", readLockMinAge);
params.put("readLockRemoveOnRollback", readLockRemoveOnRollback);
+ params.put("readLockRemoveOnCommit", readLockRemoveOnCommit);
return params;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index 763b7e0..b9cf193 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -47,6 +47,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
private CamelContext camelContext;
private IdempotentRepository<String> idempotentRepository;
private boolean removeOnRollback = true;
+ private boolean removeOnCommit;
@Override
public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception {
@@ -91,8 +92,12 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
@Override
public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
String key = asKey(file);
- // confirm on commit
- idempotentRepository.confirm(key);
+ if (removeOnCommit) {
+ idempotentRepository.remove(key);
+ } else {
+ // confirm on commit
+ idempotentRepository.confirm(key);
+ }
}
public void setTimeout(long timeout) {
@@ -151,6 +156,24 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
this.removeOnRollback = removeOnRollback;
}
+ /**
+ * Whether to remove the file from the idempotent repository when doing a commit.
+ * <p/>
+ * By default this is false.
+ */
+ public boolean isRemoveOnCommit() {
+ return removeOnCommit;
+ }
+
+ /**
+ * Whether to remove the file from the idempotent repository when doing a commit.
+ * <p/>
+ * By default this is false.
+ */
+ public void setRemoveOnCommit(boolean removeOnCommit) {
+ this.removeOnCommit = removeOnCommit;
+ }
+
protected String asKey(GenericFile<File> file) {
// use absolute file path as default key, but evaluate if an expression key was configured
String key = file.getAbsoluteFilePath();
http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index 5a31374..f9ceca2 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -134,6 +134,10 @@ public final class FileProcessStrategyFactory {
if (readLockRemoveOnRollback != null) {
readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);
}
+ Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
+ if (readLockRemoveOnCommit != null) {
+ readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
+ }
IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository");
if (repo != null) {
readLockStrategy.setIdempotentRepository(repo);