You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/14 17:14:59 UTC

camel git commit: CAMEL-8970: File consumer - Add option to turn on/off whether to delete orphaned marker lock files.

Repository: camel
Updated Branches:
  refs/heads/master 269334e22 -> 16444a7de


CAMEL-8970: File consumer - Add option to turn on/off whether to delete orphaned marker lock files.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16444a7d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16444a7d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16444a7d

Branch: refs/heads/master
Commit: 16444a7de94940c4253e0c4eeddf2d2e6563e84f
Parents: 269334e
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jul 14 16:21:29 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jul 14 16:21:29 2015 +0200

----------------------------------------------------------------------
 .../component/file/GenericFileEndpoint.java     | 18 ++++++
 .../GenericFileExclusiveReadLockStrategy.java   |  7 +++
 ...ileIdempotentRepositoryReadLockStrategy.java |  4 ++
 .../strategy/FileProcessStrategyFactory.java    |  6 +-
 ...ericFileRenameExclusiveReadLockStrategy.java |  5 ++
 .../MarkerFileExclusiveReadLockStrategy.java    | 25 +++++---
 ...ConsumerBridgeRouteExceptionHandlerTest.java |  5 ++
 .../FileConsumerCustomExceptionHandlerTest.java |  5 ++
 ...ileRecursiveDoNotDeleteOldLockFilesTest.java | 62 ++++++++++++++++++++
 .../FtpChangedExclusiveReadLockStrategy.java    |  5 ++
 .../SftpChangedExclusiveReadLockStrategy.java   |  5 ++
 11 files changed, 138 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 616fec3..57cce2c 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -164,6 +164,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
     protected long readLockTimeout = 10000;
     @UriParam(label = "consumer", defaultValue = "true")
     protected boolean readLockMarkerFile = true;
+    @UriParam(label = "consumer", defaultValue = "true")
+    protected boolean readLockDeleteOrphanLockFiles = true;
     @UriParam(label = "consumer", defaultValue = "WARN")
     protected LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
     @UriParam(label = "consumer", defaultValue = "1")
@@ -890,6 +892,21 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
         this.readLockMarkerFile = readLockMarkerFile;
     }
 
+    public boolean isReadLockDeleteOrphanLockFiles() {
+        return readLockDeleteOrphanLockFiles;
+    }
+
+    /**
+     * Whether or not read lock with marker files should upon startup delete any orphan read lock files, which may
+     * have been left on the file system, if Camel was not properly shutdown (such as a JVM crash).
+     * <p/>
+     * If turning this option to <tt>false</tt> then any orphaned lock file will cause Camel to not attempt to pickup
+     * that file, this could also be due another node is concurrently reading files from the same shared directory.
+     */
+    public void setReadLockDeleteOrphanLockFiles(boolean readLockDeleteOrphanLockFiles) {
+        this.readLockDeleteOrphanLockFiles = readLockDeleteOrphanLockFiles;
+    }
+
     public LoggingLevel getReadLockLoggingLevel() {
         return readLockLoggingLevel;
     }
@@ -1272,6 +1289,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
             params.put("readLockTimeout", readLockTimeout);
         }
         params.put("readLockMarkerFile", readLockMarkerFile);
+        params.put("readLockDeleteOrphanLockFiles", readLockDeleteOrphanLockFiles);
         params.put("readLockMinLength", readLockMinLength);
         params.put("readLockLoggingLevel", readLockLoggingLevel);
         params.put("readLockMinAge", readLockMinAge);

http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
index cf950d2..bf10ae8 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
@@ -127,4 +127,11 @@ public interface GenericFileExclusiveReadLockStrategy<T> {
      */
     void setMarkerFiler(boolean markerFile);
 
+    /**
+     * Sets whether orphan marker files should be deleted upon startup
+     *
+     * @param deleteOrphanLockFiles <tt>true</tt> to delete files, <tt>false</tt> to skip this check
+     */
+    void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles);
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index b9cf193..78b7e48 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -116,6 +116,10 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
         // noop
     }
 
+    public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+        // noop
+    }
+
     public CamelContext getCamelContext() {
         return camelContext;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index f9ceca2..f5348b0 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -112,7 +112,7 @@ public final class FileProcessStrategyFactory {
             if ("none".equals(readLock) || "false".equals(readLock)) {
                 return null;
             } else if ("markerFile".equals(readLock)) {
-                return new MarkerFileExclusiveReadLockStrategy();
+                strategy = new MarkerFileExclusiveReadLockStrategy();
             } else if ("fileLock".equals(readLock)) {
                 strategy = new FileLockExclusiveReadLockStrategy();
             } else if ("rename".equals(readLock)) {
@@ -162,6 +162,10 @@ public final class FileProcessStrategyFactory {
                 if (readLockMarkerFile != null) {
                     strategy.setMarkerFiler(readLockMarkerFile);
                 }
+                Boolean readLockDeleteOrphanLockFiles = (Boolean) params.get("readLockDeleteOrphanLockFiles");
+                if (readLockDeleteOrphanLockFiles != null) {
+                    strategy.setDeleteOrphanLockFiles(readLockDeleteOrphanLockFiles);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
index 062c7ee..976c965 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
@@ -136,4 +136,9 @@ public class GenericFileRenameExclusiveReadLockStrategy<T> implements GenericFil
     public void setMarkerFiler(boolean markerFile) {
         // noop - we do not use marker file with the rename strategy
     }
+
+    @Override
+    public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+        // noop - we do not use marker file with the rename strategy
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
index 9792df0..ceabd01 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
@@ -38,20 +38,24 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
     private static final Logger LOG = LoggerFactory.getLogger(MarkerFileExclusiveReadLockStrategy.class);
 
     private boolean markerFile = true;
+    private boolean deleteOrphanLockFiles = true;
 
     @Override
     public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
-        String dir = endpoint.getConfiguration().getDirectory();
-        File file = new File(dir);
+        if (deleteOrphanLockFiles) {
 
-        LOG.debug("Prepare on startup by deleting orphaned lock files from: {}", dir);
+            String dir = endpoint.getConfiguration().getDirectory();
+            File file = new File(dir);
 
-        StopWatch watch = new StopWatch();
-        deleteLockFiles(file, endpoint.isRecursive());
+            LOG.debug("Prepare on startup by deleting orphaned lock files from: {}", dir);
 
-        // log anything that takes more than a second
-        if (watch.taken() > 1000) {
-            LOG.info("Prepared on startup by deleting orphaned lock files from: {} took {} millis to complete.", dir, watch.taken());
+            StopWatch watch = new StopWatch();
+            deleteLockFiles(file, endpoint.isRecursive());
+
+            // log anything that takes more than a second
+            if (watch.taken() > 1000) {
+                LOG.info("Prepared on startup by deleting orphaned lock files from: {} took {} millis to complete.", dir, watch.taken());
+            }
         }
     }
 
@@ -130,6 +134,11 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
         this.markerFile = markerFile;
     }
 
+    @Override
+    public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+        this.deleteOrphanLockFiles = deleteOrphanLockFiles;
+    }
+
     private static void deleteLockFiles(File dir, boolean recursive) {
         File[] files = dir.listFiles();
         if (files == null || files.length == 0) {

http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java
index 989af6a..8259ec6 100644
--- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java
@@ -131,6 +131,11 @@ public class FileConsumerBridgeRouteExceptionHandlerTest extends ContextTestSupp
             // noop
         }
 
+        @Override
+        public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+            // noop
+        }
+
         public int getCounter() {
             return counter;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java
index d33dd8d..071ac82 100644
--- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java
@@ -188,6 +188,11 @@ public class FileConsumerCustomExceptionHandlerTest extends ContextTestSupport {
             // noop
         }
 
+        @Override
+        public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+            // noop
+        }
+
         public int getCounter() {
             return counter;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileRecursiveDoNotDeleteOldLockFilesTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileRecursiveDoNotDeleteOldLockFilesTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileRecursiveDoNotDeleteOldLockFilesTest.java
new file mode 100644
index 0000000..067c19a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileRecursiveDoNotDeleteOldLockFilesTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class FileMarkerFileRecursiveDoNotDeleteOldLockFilesTest extends ContextTestSupport {
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/oldlock");
+        template.sendBodyAndHeader("file:target/oldlock", "locked", Exchange.FILE_NAME, "hello.txt" + FileComponent.DEFAULT_LOCK_FILE_POSTFIX);
+        template.sendBodyAndHeader("file:target/oldlock", "Hello World", Exchange.FILE_NAME, "hello.txt");
+        template.sendBodyAndHeader("file:target/oldlock/foo", "locked", Exchange.FILE_NAME, "gooday.txt" + FileComponent.DEFAULT_LOCK_FILE_POSTFIX);
+        template.sendBodyAndHeader("file:target/oldlock/foo", "Goodday World", Exchange.FILE_NAME, "gooday.txt");
+        // and a new file that has no lock
+        template.sendBodyAndHeader("file:target/oldlock", "New World", Exchange.FILE_NAME, "new.txt");
+    }
+
+    public void testDeleteOldLockOnStartup() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("New World");
+        mock.setResultMinimumWaitTime(1000);
+
+        // start the route
+        context.startRoute("foo");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/oldlock?readLockDeleteOrphanLockFiles=false&recursive=true").routeId("foo").noAutoStartup()
+                        .convertBodyTo(String.class).to("log:result", "mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
index 5301465..a1db2bb 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
@@ -169,6 +169,11 @@ public class FtpChangedExclusiveReadLockStrategy implements GenericFileExclusive
         // noop - not supported by ftp
     }
 
+    @Override
+    public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+        // noop - not supported by ftp
+    }
+
     public long getMinLength() {
         return minLength;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
index 9b9fec1..5e21dd1 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
@@ -189,4 +189,9 @@ public class SftpChangedExclusiveReadLockStrategy implements GenericFileExclusiv
     public void setMarkerFiler(boolean markerFiler) {
         // noop - not supported by ftp
     }
+
+    @Override
+    public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+        // noop - not supported by ftp
+    }
 }
\ No newline at end of file