You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/02 12:32:57 UTC

[2/2] git commit: CAMEL-6413: Fixed race condition in file consumer, as read lock release should be executed last.

CAMEL-6413: Fixed race condition in file consumer, as read lock release should be executed last.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/afb28228
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/afb28228
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/afb28228

Branch: refs/heads/camel-2.11.x
Commit: afb28228adb55e27465906e3e4c7697b643fe4b6
Parents: ea2aee8
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Jun 2 11:42:08 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Jun 2 12:29:34 2013 +0200

----------------------------------------------------------------------
 .../strategy/GenericFileDeleteProcessStrategy.java |   95 ++++++++------
 .../GenericFileProcessStrategySupport.java         |   19 ++--
 .../strategy/GenericFileRenameProcessStrategy.java |   61 +++++----
 3 files changed, 99 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/afb28228/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
index 26b3850..b2ccb8e 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
@@ -50,54 +50,67 @@ public class GenericFileDeleteProcessStrategy<T> extends GenericFileProcessStrat
 
     @Override
     public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
-        // must invoke super
-        super.commit(operations, endpoint, exchange, file);
-
-        int retries = 3;
-        boolean deleted = false;
-
-        while (retries > 0 && !deleted) {
-            retries--;
-
-            if (operations.deleteFile(file.getAbsoluteFilePath())) {
-                // file is deleted
-                deleted = true;
-                break;
+        try {
+            deleteLocalWorkFile(exchange);
+            operations.releaseRetreivedFileResources(exchange);
+
+            int retries = 3;
+            boolean deleted = false;
+
+            while (retries > 0 && !deleted) {
+                retries--;
+
+                if (operations.deleteFile(file.getAbsoluteFilePath())) {
+                    // file is deleted
+                    deleted = true;
+                    break;
+                }
+
+                // some OS can report false when deleting but the file is still deleted
+                // use exists to check instead
+                boolean exits = operations.existsFile(file.getAbsoluteFilePath());
+                if (!exits) {
+                    deleted = true;
+                } else {
+                    log.trace("File was not deleted at this attempt will try again in 1 sec.: {}", file);
+                    // sleep a bit and try again
+                    Thread.sleep(1000);
+                }
             }
-
-            // some OS can report false when deleting but the file is still deleted
-            // use exists to check instead
-            boolean exits = operations.existsFile(file.getAbsoluteFilePath());
-            if (!exits) {
-                deleted = true;
-            } else {
-                log.trace("File was not deleted at this attempt will try again in 1 sec.: {}", file);
-                // sleep a bit and try again
-                Thread.sleep(1000);
+            if (!deleted) {
+                throw new GenericFileOperationFailedException("Cannot delete file: " + file);
+            }
+        } finally {
+            // must release lock last
+            if (exclusiveReadLockStrategy != null) {
+                exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
             }
-        }
-
-        if (!deleted) {
-            throw new GenericFileOperationFailedException("Cannot delete file: " + file);
         }
     }
 
     @Override
     public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
-        // must invoke super
-        super.rollback(operations, endpoint, exchange, file);
-
-        // moved the failed file if specifying the moveFailed option
-        if (failureRenamer != null) {
-            // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
-            Exchange copy = exchange.copy();
-            file.bindToExchange(copy);
-            // must preserve message id
-            copy.getIn().setMessageId(exchange.getIn().getMessageId());
-            copy.setExchangeId(exchange.getExchangeId());
-
-            GenericFile<T> newName = failureRenamer.renameFile(copy, file);
-            renameFile(operations, file, newName);
+        try {
+            deleteLocalWorkFile(exchange);
+            operations.releaseRetreivedFileResources(exchange);
+
+            // moved the failed file if specifying the moveFailed option
+            if (failureRenamer != null) {
+                // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
+                Exchange copy = exchange.copy();
+                file.bindToExchange(copy);
+                // must preserve message id
+                copy.getIn().setMessageId(exchange.getIn().getMessageId());
+                copy.setExchangeId(exchange.getExchangeId());
+
+                GenericFile<T> newName = failureRenamer.renameFile(copy, file);
+                renameFile(operations, file, newName);
+            }
+        } finally {
+            // must release lock last
+            if (exclusiveReadLockStrategy != null) {
+                exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/afb28228/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
index 06254dc..b1ddcc2 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
@@ -57,30 +57,33 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil
     }
 
     public void abort(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
+        deleteLocalWorkFile(exchange);
+        operations.releaseRetreivedFileResources(exchange);
+
+        // must release lock last
         if (exclusiveReadLockStrategy != null) {
             exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
         }
+    }
 
+    public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
         deleteLocalWorkFile(exchange);
         operations.releaseRetreivedFileResources(exchange);
-    }
 
-    public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
+        // must release lock last
         if (exclusiveReadLockStrategy != null) {
             exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
         }
+    }
 
+    public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
         deleteLocalWorkFile(exchange);
         operations.releaseRetreivedFileResources(exchange);
-    }
 
-    public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
+        // must release lock last
         if (exclusiveReadLockStrategy != null) {
             exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
         }
-
-        deleteLocalWorkFile(exchange);
-        operations.releaseRetreivedFileResources(exchange);
     }
 
     public GenericFileExclusiveReadLockStrategy<T> getExclusiveReadLockStrategy() {
@@ -115,7 +118,7 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil
         return to;
     }
 
-    private void deleteLocalWorkFile(Exchange exchange) {
+    protected void deleteLocalWorkFile(Exchange exchange) {
         // delete local work file, if it was used (eg by ftp component)
         File local = exchange.getIn().getHeader(Exchange.FILE_LOCAL_WORK_PATH, File.class);
         if (local != null && local.exists()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/afb28228/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
index 106918a..7378392 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
@@ -31,7 +31,6 @@ public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrat
 
     @Override
     public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
-
         // must invoke super
         boolean result = super.begin(operations, endpoint, exchange, file);
         if (!result) {
@@ -52,37 +51,45 @@ public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrat
 
     @Override
     public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
-        // must invoke super
-        super.rollback(operations, endpoint, exchange, file);
-
-        if (failureRenamer != null) {
-            // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
-            Exchange copy = exchange.copy();
-            file.bindToExchange(copy);
-            // must preserve message id
-            copy.getIn().setMessageId(exchange.getIn().getMessageId());
-            copy.setExchangeId(exchange.getExchangeId());
-
-            GenericFile<T> newName = failureRenamer.renameFile(copy, file);
-            renameFile(operations, file, newName);
+        try {
+            operations.releaseRetreivedFileResources(exchange);
+
+            if (failureRenamer != null) {
+                // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
+                Exchange copy = exchange.copy();
+                file.bindToExchange(copy);
+                // must preserve message id
+                copy.getIn().setMessageId(exchange.getIn().getMessageId());
+                copy.setExchangeId(exchange.getExchangeId());
+
+                GenericFile<T> newName = failureRenamer.renameFile(copy, file);
+                renameFile(operations, file, newName);
+            }
+        } finally {
+            if (exclusiveReadLockStrategy != null) {
+                exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
+            }
+            deleteLocalWorkFile(exchange);
         }
     }
 
     @Override
     public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
-        // must invoke super
-        super.commit(operations, endpoint, exchange, file);
-
-        if (commitRenamer != null) {
-            // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
-            Exchange copy = exchange.copy();
-            file.bindToExchange(copy);
-            // must preserve message id
-            copy.getIn().setMessageId(exchange.getIn().getMessageId());
-            copy.setExchangeId(exchange.getExchangeId());
-
-            GenericFile<T> newName = commitRenamer.renameFile(copy, file);
-            renameFile(operations, file, newName);
+        try {
+            if (commitRenamer != null) {
+                // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name
+                Exchange copy = exchange.copy();
+                file.bindToExchange(copy);
+                // must preserve message id
+                copy.getIn().setMessageId(exchange.getIn().getMessageId());
+                copy.setExchangeId(exchange.getExchangeId());
+
+                GenericFile<T> newName = commitRenamer.renameFile(copy, file);
+                renameFile(operations, file, newName);
+            }
+        } finally {
+            // must invoke super
+            super.commit(operations, endpoint, exchange, file);
         }
     }