You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/06/26 22:13:09 UTC

[3/6] git commit: CAMEL-7407: file/ftp consumer should eager remove in progress files in case pollDirectory throw exception.

CAMEL-7407: file/ftp consumer should eager remove in progress files in case pollDirectory throw exception.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ea9ac3c0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ea9ac3c0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ea9ac3c0

Branch: refs/heads/camel-2.13.x
Commit: ea9ac3c0bb86e9f0f973222672c340673822ff07
Parents: c88d8eb
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Jun 26 21:35:24 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jun 26 22:12:26 2014 +0200

----------------------------------------------------------------------
 .../component/file/GenericFileConsumer.java     | 24 +++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ea9ac3c0/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 2b86c7f..8a39211 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -114,7 +114,17 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
 
         // time how long time it takes to poll
         StopWatch stop = new StopWatch();
-        boolean limitHit = !pollDirectory(name, files, 0);
+        boolean limitHit;
+        try {
+            limitHit = !pollDirectory(name, files, 0);
+        } catch (Exception e) {
+            // during poll directory we add files to the in progress repository, in case of any exception thrown after this work
+            // we must then drain the in progress files before rethrowing the exception
+            log.debug("Error occurred during poll directory: " + name + " due " + e.getMessage() + ". Removing " + files.size() + " files marked as in-progress.");
+            removeExcessiveInProgressFiles(files);
+            throw e;
+        }
+
         long delta = stop.stop();
         if (log.isDebugEnabled()) {
             log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name);
@@ -231,6 +241,18 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
     }
 
     /**
+     * Drain any in progress files as we are done with the files
+     *
+     * @param files  the files
+     */
+    protected void removeExcessiveInProgressFiles(List<GenericFile<T>> files) {
+        for (GenericFile file : files) {
+            String key = file.getAbsoluteFilePath();
+            endpoint.getInProgressRepository().remove(key);
+        }
+    }
+
+    /**
      * Whether or not we can continue polling for more files
      *
      * @param fileList  the current list of gathered files