You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/05/03 11:03:56 UTC
[02/13] camel git commit: CAMEL-8727: File consumer - Add read lock
that is based on idempotent repository
CAMEL-8727: File consumer - Add read lock that is based on idempotent repository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78ccf133
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78ccf133
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78ccf133
Branch: refs/heads/master
Commit: 78ccf13325d4e159df9161b68e0fc7602de5e6d8
Parents: 1c20c43
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 1 14:42:33 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun May 3 10:52:34 2015 +0200
----------------------------------------------------------------------
.../component/file/GenericFileEndpoint.java | 20 ++-
.../GenericFileExclusiveReadLockStrategy.java | 25 ++-
...ileIdempotentRepositoryReadLockStrategy.java | 160 +++++++++++++++++++
.../FileLockExclusiveReadLockStrategy.java | 9 +-
.../strategy/FileProcessStrategyFactory.java | 12 ++
.../FileRenameExclusiveReadLockStrategy.java | 30 +++-
.../GenericFileProcessStrategySupport.java | 6 +-
...ericFileRenameExclusiveReadLockStrategy.java | 13 +-
.../GenericFileRenameProcessStrategy.java | 2 +-
.../MarkerFileExclusiveReadLockStrategy.java | 18 ++-
10 files changed, 272 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 8f1d458..0eafd12 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -154,7 +154,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
protected Comparator<GenericFile<T>> sorter;
@UriParam(label = "consumer")
protected Comparator<Exchange> sortBy;
- @UriParam(label = "consumer", enums = "none,markerFile,fileLock,rename,changed")
+ @UriParam(label = "consumer", enums = "none,markerFile,fileLock,rename,changed,idempotent")
protected String readLock = "none";
@UriParam(label = "consumer", defaultValue = "1000")
protected long readLockCheckInterval = 1000;
@@ -168,6 +168,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
protected long readLockMinLength = 1;
@UriParam(label = "consumer", defaultValue = "0")
protected long readLockMinAge;
+ @UriParam(label = "consumer", defaultValue = "true")
+ protected boolean readLockRemoveOnRollback = true;
@UriParam(label = "consumer")
protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
@UriParam(label = "consumer")
@@ -1210,6 +1212,9 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
if (readLock != null) {
params.put("readLock", readLock);
}
+ if ("idempotent".equals(readLock)) {
+ params.put("readLockIdempotentRepository", idempotentRepository);
+ }
if (readLockCheckInterval > 0) {
params.put("readLockCheckInterval", readLockCheckInterval);
}
@@ -1220,7 +1225,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
params.put("readLockMinLength", readLockMinLength);
params.put("readLockLoggingLevel", readLockLoggingLevel);
params.put("readLockMinAge", readLockMinAge);
-
+ params.put("readLockRemoveOnRollback", readLockRemoveOnRollback);
return params;
}
@@ -1331,14 +1336,21 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
+ " to ensure that the read lock procedure has enough time to acquire the lock.");
}
}
+ if ("idempotent".equals(readLock) && idempotentRepository == null) {
+ throw new IllegalArgumentException("IdempotentRepository must be configured when using readLock=idempotent");
+ }
- ServiceHelper.startServices(inProgressRepository, idempotentRepository);
+ // idempotent repository may be used by others, so add it as a service so its stopped when CamelContext stops
+ if (idempotentRepository != null) {
+ getCamelContext().addService(idempotentRepository, true);
+ }
+ ServiceHelper.startServices(inProgressRepository);
super.doStart();
}
@Override
protected void doStop() throws Exception {
super.doStop();
- ServiceHelper.stopServices(inProgressRepository, idempotentRepository);
+ ServiceHelper.stopServices(inProgressRepository);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
index 36875f5..cf950d2 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
@@ -31,6 +31,7 @@ import org.apache.camel.LoggingLevel;
* <li>FileLockExclusiveReadLockStrategy acquiring a RW file lock for the duration of the processing.</li>
* <li>MarkerFileExclusiveReadLockStrategy using a marker file for acquiring read lock.</li>
* <li>FileChangedExclusiveReadLockStrategy using a file changed detection for acquiring read lock.</li>
+ * <li>FileIdempotentRepositoryReadLockStrategy using a {@link org.apache.camel.spi.IdempotentRepository} to hold the read locks which allows to support clustering.</li>
* </ul>
*/
public interface GenericFileExclusiveReadLockStrategy<T> {
@@ -57,14 +58,34 @@ public interface GenericFileExclusiveReadLockStrategy<T> {
boolean acquireExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception;
/**
- * Releases the exclusive read lock granted by the <tt>acquireExclusiveReadLock</tt> method.
+ * Releases the exclusive read lock granted by the <tt>acquireExclusiveReadLock</tt> method due an abort operation (acquireExclusiveReadLock returned false).
*
* @param operations generic file operations
* @param file the file
* @param exchange the exchange
* @throws Exception can be thrown in case of errors
*/
- void releaseExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception;
+ void releaseExclusiveReadLockOnAbort(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception;
+
+ /**
+ * Releases the exclusive read lock granted by the <tt>acquireExclusiveReadLock</tt> method due a rollback operation (Exchange processing failed)
+ *
+ * @param operations generic file operations
+ * @param file the file
+ * @param exchange the exchange
+ * @throws Exception can be thrown in case of errors
+ */
+ void releaseExclusiveReadLockOnRollback(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception;
+
+ /**
+ * Releases the exclusive read lock granted by the <tt>acquireExclusiveReadLock</tt> method due a commit operation (Exchange processing succeeded)
+ *
+ * @param operations generic file operations
+ * @param file the file
+ * @param exchange the exchange
+ * @throws Exception can be thrown in case of errors
+ */
+ void releaseExclusiveReadLockOnCommit(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception;
/**
* Sets an optional timeout period.
http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
new file mode 100644
index 0000000..8e043b8
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileEndpoint;
+import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
+import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A file read lock that uses an {@link org.apache.camel.spi.IdempotentRepository} as the lock strategy. This allows to plugin and use existing
+ * idempotent repositories that for example supports clustering. The other read lock strategies that are using marker files or file locks,
+ * are not guaranteed to work in clustered setup with various platform and file systems.
+ */
+public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport implements GenericFileExclusiveReadLockStrategy<File>, CamelContextAware {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRepositoryReadLockStrategy.class);
+
+ private LoggingLevel loggingLevel = LoggingLevel.TRACE;
+ private CamelContext camelContext;
+ private IdempotentRepository<String> idempotentRepository;
+ private boolean removeOnRollback = true;
+
+ @Override
+ public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception {
+ LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint);
+ }
+
+ @Override
+ public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ // check if we can begin on this file
+ String key = asKey(file);
+ boolean answer = idempotentRepository.add(key);
+ CamelLogger.log(LOG, loggingLevel, "acquireExclusiveReadLock: " + key + " -> " + answer);
+ return answer;
+ }
+
+ @Override
+ public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ String key = asKey(file);
+ CamelLogger.log(LOG, loggingLevel, "releaseExclusiveReadLockOnAbort: " + key);
+ }
+
+ @Override
+ public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ if (removeOnRollback) {
+ String key = asKey(file);
+ CamelLogger.log(LOG, loggingLevel, "releaseExclusiveReadLockOnRollback: " + key);
+ idempotentRepository.remove(key);
+ }
+ }
+
+ @Override
+ public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ String key = asKey(file);
+ CamelLogger.log(LOG, loggingLevel, "releaseExclusiveReadLockOnCommit: " + key);
+ idempotentRepository.contains(key);
+ }
+
+ public void setTimeout(long timeout) {
+ // noop
+ }
+
+ public void setCheckInterval(long checkInterval) {
+ // noop
+ }
+
+ public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
+ this.loggingLevel = readLockLoggingLevel;
+ }
+
+ public void setMarkerFiler(boolean markerFile) {
+ // noop
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ /**
+ * The idempotent repository to use as the store for the read locks.
+ */
+ public IdempotentRepository<String> getIdempotentRepository() {
+ return idempotentRepository;
+ }
+
+ /**
+ * The idempotent repository to use as the store for the read locks.
+ */
+ public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) {
+ this.idempotentRepository = idempotentRepository;
+ }
+
+ /**
+ * Whether to remove the file from the idempotent repository when doing a rollback.
+ * <p/>
+ * By default this is true.
+ */
+ public boolean isRemoveOnRollback() {
+ return removeOnRollback;
+ }
+
+ /**
+ * Whether to remove the file from the idempotent repository when doing a rollback.
+ * <p/>
+ * By default this is true.
+ */
+ public void setRemoveOnRollback(boolean removeOnRollback) {
+ this.removeOnRollback = removeOnRollback;
+ }
+
+ protected String asKey(GenericFile<File> file) {
+ return file.getAbsoluteFilePath();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(camelContext, "camelContext", this);
+ ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
+
+ // ensure the idempotent repository is added as a service so CamelContext will stop the repo when it shutdown itself
+ camelContext.addService(idempotentRepository, true);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
index ae5b0ff..8fd94f5 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
@@ -123,7 +123,7 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo
IOHelper.close(randomAccessFile, "while acquiring exclusive read lock for file: " + target, LOG);
// and also must release super lock
- super.releaseExclusiveReadLock(operations, file, exchange);
+ super.releaseExclusiveReadLockOnAbort(operations, file, exchange);
}
}
@@ -135,11 +135,10 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo
}
@Override
- public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
- GenericFile<File> file, Exchange exchange) throws Exception {
-
+ protected void doReleaseExclusiveReadLock(GenericFileOperations<File> operations,
+ GenericFile<File> file, Exchange exchange) throws Exception {
// must call super
- super.releaseExclusiveReadLock(operations, file, exchange);
+ super.doReleaseExclusiveReadLock(operations, file, exchange);
String target = file.getFileName();
FileLock lock = exchange.getProperty(Exchange.FILE_LOCK_EXCLUSIVE_LOCK, FileLock.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index ed5bd4e..5a31374 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -24,6 +24,7 @@ import org.apache.camel.Expression;
import org.apache.camel.LoggingLevel;
import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
import org.apache.camel.component.file.GenericFileProcessStrategy;
+import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.spi.Language;
import org.apache.camel.util.ObjectHelper;
@@ -127,6 +128,17 @@ public final class FileProcessStrategyFactory {
readLockStrategy.setMinAge(minAge);
}
strategy = readLockStrategy;
+ } else if ("idempotent".equals(readLock)) {
+ FileIdempotentRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRepositoryReadLockStrategy();
+ Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback");
+ if (readLockRemoveOnRollback != null) {
+ readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);
+ }
+ IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository");
+ if (repo != null) {
+ readLockStrategy.setIdempotentRepository(repo);
+ }
+ strategy = readLockStrategy;
}
if (strategy != null) {
http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
index a2bbc49..f4e4ed7 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
@@ -49,18 +49,40 @@ public class FileRenameExclusiveReadLockStrategy extends GenericFileRenameExclus
}
@Override
- public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
- GenericFile<File> file, Exchange exchange) throws Exception {
+ public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
// must call marker first
try {
if (markerFile) {
- marker.releaseExclusiveReadLock(operations, file, exchange);
+ marker.releaseExclusiveReadLockOnAbort(operations, file, exchange);
}
} finally {
- super.releaseExclusiveReadLock(operations, file, exchange);
+ super.releaseExclusiveReadLockOnAbort(operations, file, exchange);
}
}
+ @Override
+ public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ // must call marker first
+ try {
+ if (markerFile) {
+ marker.releaseExclusiveReadLockOnRollback(operations, file, exchange);
+ }
+ } finally {
+ super.releaseExclusiveReadLockOnRollback(operations, file, exchange);
+ }
+ }
+
+ @Override
+ public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ // must call marker first
+ try {
+ if (markerFile) {
+ marker.releaseExclusiveReadLockOnCommit(operations, file, exchange);
+ }
+ } finally {
+ super.releaseExclusiveReadLockOnCommit(operations, file, exchange);
+ }
+ }
@Override
public void setMarkerFiler(boolean markerFile) {
http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
index 504593f..cbe125e 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
@@ -62,7 +62,7 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil
// must release lock last
if (exclusiveReadLockStrategy != null) {
- exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
+ exclusiveReadLockStrategy.releaseExclusiveReadLockOnAbort(operations, file, exchange);
}
}
@@ -72,7 +72,7 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil
// must release lock last
if (exclusiveReadLockStrategy != null) {
- exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
+ exclusiveReadLockStrategy.releaseExclusiveReadLockOnCommit(operations, file, exchange);
}
}
@@ -82,7 +82,7 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil
// must release lock last
if (exclusiveReadLockStrategy != null) {
- exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
+ exclusiveReadLockStrategy.releaseExclusiveReadLockOnRollback(operations, file, exchange);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
index fe8343a..062c7ee 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
@@ -88,8 +88,17 @@ public class GenericFileRenameExclusiveReadLockStrategy<T> implements GenericFil
}
@Override
- public void releaseExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file,
- Exchange exchange) throws Exception {
+ public void releaseExclusiveReadLockOnAbort(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception {
+ // noop
+ }
+
+ @Override
+ public void releaseExclusiveReadLockOnRollback(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception {
+ // noop
+ }
+
+ @Override
+ public void releaseExclusiveReadLockOnCommit(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception {
// noop
}
http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
index 7378392..9271b0f 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
@@ -67,7 +67,7 @@ public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrat
}
} finally {
if (exclusiveReadLockStrategy != null) {
- exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
+ exclusiveReadLockStrategy.releaseExclusiveReadLockOnRollback(operations, file, exchange);
}
deleteLocalWorkFile(exchange);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
index 6deebff..9792df0 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
@@ -76,8 +76,22 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
}
@Override
- public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
- GenericFile<File> file, Exchange exchange) throws Exception {
+ public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ doReleaseExclusiveReadLock(operations, file, exchange);
+ }
+
+ @Override
+ public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ doReleaseExclusiveReadLock(operations, file, exchange);
+ }
+
+ @Override
+ public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ doReleaseExclusiveReadLock(operations, file, exchange);
+ }
+
+ protected void doReleaseExclusiveReadLock(GenericFileOperations<File> operations,
+ GenericFile<File> file, Exchange exchange) throws Exception {
if (!markerFile) {
// if not using marker file then nothing to release
return;