You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/05/03 11:04:01 UTC
[07/13] camel git commit: CAMEL-8727: File consumer - Add read lock
that is based on idempotent repository
CAMEL-8727: File consumer - Add read lock that is based on idempotent repository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f24eee8b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f24eee8b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f24eee8b
Branch: refs/heads/master
Commit: f24eee8beb158c0ef2c259e2fb22369f79f53bb9
Parents: 9f86d16
Author: Claus Ibsen <da...@apache.org>
Authored: Sun May 3 09:32:07 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun May 3 10:52:36 2015 +0200
----------------------------------------------------------------------
.../component/file/GenericFileEndpoint.java | 33 +++++++++++++++++-
...ileIdempotentRepositoryReadLockStrategy.java | 36 ++++++++++++++++++--
.../strategy/FileProcessStrategyFactory.java | 4 +++
3 files changed, 70 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f24eee8b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 0eafd12..48512e9 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -169,6 +169,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
@UriParam(label = "consumer", defaultValue = "0")
protected long readLockMinAge;
@UriParam(label = "consumer", defaultValue = "true")
+ protected boolean readLockRemoveOnCommit = true;
+ @UriParam(label = "consumer", defaultValue = "true")
protected boolean readLockRemoveOnRollback = true;
@UriParam(label = "consumer")
protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
@@ -904,7 +906,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
/**
* This option applied only for readLock=change.
- * This options allows to specify a minimum age the file must be before attempting to acquire the read lock.
+ * This option allows to specify a minimum age the file must be before attempting to acquire the read lock.
* For example use readLockMinAge=300s to require the file is at last 5 minutes old.
* This can speedup the changed read lock as it will only attempt to acquire files which are at least that given age.
*/
@@ -912,6 +914,34 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
this.readLockMinAge = readLockMinAge;
}
+ public boolean isReadLockRemoveOnCommit() {
+ return readLockRemoveOnCommit;
+ }
+
+ /**
+ * This option applied only for readLock=idempotent.
+ * This option allows to specify whether to remove the file name entry from the idempotent repository
+ * when the file was processed successfully and is committed. Setting this to <tt>false</tt> allows
+ * to use the read lock as both read lock and idempotent consumer at the same time, as previously
+ * processed file will be kept in the idempotent repository so the same file is not processed again.
+ */
+ public void setReadLockRemoveOnCommit(boolean readLockRemoveOnCommit) {
+ this.readLockRemoveOnCommit = readLockRemoveOnCommit;
+ }
+
+ public boolean isReadLockRemoveOnRollback() {
+ return readLockRemoveOnRollback;
+ }
+
+ /**
+ * This option applied only for readLock=idempotent.
+ * This option allows to specify whether to remove the file name entry from the idempotent repository
+ * when processing the file failed and a rollback happens.
+ */
+ public void setReadLockRemoveOnRollback(boolean readLockRemoveOnRollback) {
+ this.readLockRemoveOnRollback = readLockRemoveOnRollback;
+ }
+
public int getBufferSize() {
return bufferSize;
}
@@ -1225,6 +1255,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
params.put("readLockMinLength", readLockMinLength);
params.put("readLockLoggingLevel", readLockLoggingLevel);
params.put("readLockMinAge", readLockMinAge);
+ params.put("readLockRemoveOnCommit", readLockRemoveOnCommit);
params.put("readLockRemoveOnRollback", readLockRemoveOnRollback);
return params;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/f24eee8b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index 8e043b8..28f4865 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -42,13 +42,16 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRepositoryReadLockStrategy.class);
+ private GenericFileEndpoint<File> endpoint;
private LoggingLevel loggingLevel = LoggingLevel.TRACE;
private CamelContext camelContext;
private IdempotentRepository<String> idempotentRepository;
private boolean removeOnRollback = true;
+ private boolean removeOnCommit = true;
@Override
public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception {
+ this.endpoint = endpoint;
LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint);
}
@@ -80,7 +83,12 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
String key = asKey(file);
CamelLogger.log(LOG, loggingLevel, "releaseExclusiveReadLockOnCommit: " + key);
- idempotentRepository.contains(key);
+ if (removeOnCommit) {
+ idempotentRepository.remove(key);
+ } else {
+ // if not remove then confirm
+ idempotentRepository.confirm(key);
+ }
}
public void setTimeout(long timeout) {
@@ -139,8 +147,32 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
this.removeOnRollback = removeOnRollback;
}
+ /**
+ * Whether to remove the file from the idempotent repository when doing a commit.
+ * <p/>
+ * By default this is commit.
+ */
+ public boolean isRemoveOnCommit() {
+ return removeOnCommit;
+ }
+
+ /**
+ * Whether to remove the file from the idempotent repository when doing a commit.
+ * <p/>
+ * By default this is commit.
+ */
+ public void setRemoveOnCommit(boolean removeOnCommit) {
+ this.removeOnCommit = removeOnCommit;
+ }
+
protected String asKey(GenericFile<File> file) {
- return file.getAbsoluteFilePath();
+ // use absolute file path as default key, but evaluate if an expression key was configured
+ String key = file.getAbsoluteFilePath();
+ if (endpoint.getIdempotentKey() != null) {
+ Exchange dummy = endpoint.createExchange(file);
+ key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
+ }
+ return key;
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/f24eee8b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index 5a31374..6987905 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -130,6 +130,10 @@ public final class FileProcessStrategyFactory {
strategy = readLockStrategy;
} else if ("idempotent".equals(readLock)) {
FileIdempotentRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRepositoryReadLockStrategy();
+ Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
+ if (readLockRemoveOnCommit != null) {
+ readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
+ }
Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback");
if (readLockRemoveOnRollback != null) {
readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);