You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/02 12:32:57 UTC
[2/2] git commit: CAMEL-6413: Fixed race condition in file consumer,
as read lock release should be executed last.
CAMEL-6413: Fixed race condition in file consumer, as read lock release should be executed last.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/afb28228
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/afb28228
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/afb28228
Branch: refs/heads/camel-2.11.x
Commit: afb28228adb55e27465906e3e4c7697b643fe4b6
Parents: ea2aee8
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Jun 2 11:42:08 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Jun 2 12:29:34 2013 +0200
----------------------------------------------------------------------
.../strategy/GenericFileDeleteProcessStrategy.java | 95 ++++++++------
.../GenericFileProcessStrategySupport.java | 19 ++--
.../strategy/GenericFileRenameProcessStrategy.java | 61 +++++----
3 files changed, 99 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/afb28228/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
index 26b3850..b2ccb8e 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
@@ -50,54 +50,67 @@ public class GenericFileDeleteProcessStrategy<T> extends GenericFileProcessStrat
@Override
public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
- // must invoke super
- super.commit(operations, endpoint, exchange, file);
-
- int retries = 3;
- boolean deleted = false;
-
- while (retries > 0 && !deleted) {
- retries--;
-
- if (operations.deleteFile(file.getAbsoluteFilePath())) {
- // file is deleted
- deleted = true;
- break;
+ try {
+ deleteLocalWorkFile(exchange);
+ operations.releaseRetreivedFileResources(exchange);
+
+ int retries = 3;
+ boolean deleted = false;
+
+ while (retries > 0 && !deleted) {
+ retries--;
+
+ if (operations.deleteFile(file.getAbsoluteFilePath())) {
+ // file is deleted
+ deleted = true;
+ break;
+ }
+
+ // some OS can report false when deleting but the file is still deleted
+ // use exists to check instead
+ boolean exits = operations.existsFile(file.getAbsoluteFilePath());
+ if (!exits) {
+ deleted = true;
+ } else {
+ log.trace("File was not deleted at this attempt will try again in 1 sec.: {}", file);
+ // sleep a bit and try again
+ Thread.sleep(1000);
+ }
}
-
- // some OS can report false when deleting but the file is still deleted
- // use exists to check instead
- boolean exits = operations.existsFile(file.getAbsoluteFilePath());
- if (!exits) {
- deleted = true;
- } else {
- log.trace("File was not deleted at this attempt will try again in 1 sec.: {}", file);
- // sleep a bit and try again
- Thread.sleep(1000);
+ if (!deleted) {
+ throw new GenericFileOperationFailedException("Cannot delete file: " + file);
+ }
+ } finally {
+ // must release lock last
+ if (exclusiveReadLockStrategy != null) {
+ exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
}
- }
-
- if (!deleted) {
- throw new GenericFileOperationFailedException("Cannot delete file: " + file);
}
}
@Override
public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
- // must invoke super
- super.rollback(operations, endpoint, exchange, file);
-
- // moved the failed file if specifying the moveFailed option
- if (failureRenamer != null) {
- // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
- Exchange copy = exchange.copy();
- file.bindToExchange(copy);
- // must preserve message id
- copy.getIn().setMessageId(exchange.getIn().getMessageId());
- copy.setExchangeId(exchange.getExchangeId());
-
- GenericFile<T> newName = failureRenamer.renameFile(copy, file);
- renameFile(operations, file, newName);
+ try {
+ deleteLocalWorkFile(exchange);
+ operations.releaseRetreivedFileResources(exchange);
+
+ // moved the failed file if specifying the moveFailed option
+ if (failureRenamer != null) {
+ // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
+ Exchange copy = exchange.copy();
+ file.bindToExchange(copy);
+ // must preserve message id
+ copy.getIn().setMessageId(exchange.getIn().getMessageId());
+ copy.setExchangeId(exchange.getExchangeId());
+
+ GenericFile<T> newName = failureRenamer.renameFile(copy, file);
+ renameFile(operations, file, newName);
+ }
+ } finally {
+ // must release lock last
+ if (exclusiveReadLockStrategy != null) {
+ exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/afb28228/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
index 06254dc..b1ddcc2 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
@@ -57,30 +57,33 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil
}
public void abort(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
+ deleteLocalWorkFile(exchange);
+ operations.releaseRetreivedFileResources(exchange);
+
+ // must release lock last
if (exclusiveReadLockStrategy != null) {
exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
}
+ }
+ public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
deleteLocalWorkFile(exchange);
operations.releaseRetreivedFileResources(exchange);
- }
- public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
+ // must release lock last
if (exclusiveReadLockStrategy != null) {
exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
}
+ }
+ public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
deleteLocalWorkFile(exchange);
operations.releaseRetreivedFileResources(exchange);
- }
- public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
+ // must release lock last
if (exclusiveReadLockStrategy != null) {
exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
}
-
- deleteLocalWorkFile(exchange);
- operations.releaseRetreivedFileResources(exchange);
}
public GenericFileExclusiveReadLockStrategy<T> getExclusiveReadLockStrategy() {
@@ -115,7 +118,7 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil
return to;
}
- private void deleteLocalWorkFile(Exchange exchange) {
+ protected void deleteLocalWorkFile(Exchange exchange) {
// delete local work file, if it was used (eg by ftp component)
File local = exchange.getIn().getHeader(Exchange.FILE_LOCAL_WORK_PATH, File.class);
if (local != null && local.exists()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/afb28228/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
index 106918a..7378392 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
@@ -31,7 +31,6 @@ public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrat
@Override
public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
-
// must invoke super
boolean result = super.begin(operations, endpoint, exchange, file);
if (!result) {
@@ -52,37 +51,45 @@ public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrat
@Override
public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
- // must invoke super
- super.rollback(operations, endpoint, exchange, file);
-
- if (failureRenamer != null) {
- // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
- Exchange copy = exchange.copy();
- file.bindToExchange(copy);
- // must preserve message id
- copy.getIn().setMessageId(exchange.getIn().getMessageId());
- copy.setExchangeId(exchange.getExchangeId());
-
- GenericFile<T> newName = failureRenamer.renameFile(copy, file);
- renameFile(operations, file, newName);
+ try {
+ operations.releaseRetreivedFileResources(exchange);
+
+ if (failureRenamer != null) {
+ // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
+ Exchange copy = exchange.copy();
+ file.bindToExchange(copy);
+ // must preserve message id
+ copy.getIn().setMessageId(exchange.getIn().getMessageId());
+ copy.setExchangeId(exchange.getExchangeId());
+
+ GenericFile<T> newName = failureRenamer.renameFile(copy, file);
+ renameFile(operations, file, newName);
+ }
+ } finally {
+ if (exclusiveReadLockStrategy != null) {
+ exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
+ }
+ deleteLocalWorkFile(exchange);
}
}
@Override
public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
- // must invoke super
- super.commit(operations, endpoint, exchange, file);
-
- if (commitRenamer != null) {
- // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
- Exchange copy = exchange.copy();
- file.bindToExchange(copy);
- // must preserve message id
- copy.getIn().setMessageId(exchange.getIn().getMessageId());
- copy.setExchangeId(exchange.getExchangeId());
-
- GenericFile<T> newName = commitRenamer.renameFile(copy, file);
- renameFile(operations, file, newName);
+ try {
+ if (commitRenamer != null) {
+ // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
+ Exchange copy = exchange.copy();
+ file.bindToExchange(copy);
+ // must preserve message id
+ copy.getIn().setMessageId(exchange.getIn().getMessageId());
+ copy.setExchangeId(exchange.getExchangeId());
+
+ GenericFile<T> newName = commitRenamer.renameFile(copy, file);
+ renameFile(operations, file, newName);
+ }
+ } finally {
+ // must invoke super
+ super.commit(operations, endpoint, exchange, file);
}
}