You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/14 17:14:59 UTC
camel git commit: CAMEL-8970: File consumer - Add option to turn
on/off whether to delete orphaned marker lock files.
Repository: camel
Updated Branches:
refs/heads/master 269334e22 -> 16444a7de
CAMEL-8970: File consumer - Add option to turn on/off whether to delete orphaned marker lock files.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16444a7d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16444a7d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16444a7d
Branch: refs/heads/master
Commit: 16444a7de94940c4253e0c4eeddf2d2e6563e84f
Parents: 269334e
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jul 14 16:21:29 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jul 14 16:21:29 2015 +0200
----------------------------------------------------------------------
.../component/file/GenericFileEndpoint.java | 18 ++++++
.../GenericFileExclusiveReadLockStrategy.java | 7 +++
...ileIdempotentRepositoryReadLockStrategy.java | 4 ++
.../strategy/FileProcessStrategyFactory.java | 6 +-
...ericFileRenameExclusiveReadLockStrategy.java | 5 ++
.../MarkerFileExclusiveReadLockStrategy.java | 25 +++++---
...ConsumerBridgeRouteExceptionHandlerTest.java | 5 ++
.../FileConsumerCustomExceptionHandlerTest.java | 5 ++
...ileRecursiveDoNotDeleteOldLockFilesTest.java | 62 ++++++++++++++++++++
.../FtpChangedExclusiveReadLockStrategy.java | 5 ++
.../SftpChangedExclusiveReadLockStrategy.java | 5 ++
11 files changed, 138 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 616fec3..57cce2c 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -164,6 +164,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
protected long readLockTimeout = 10000;
@UriParam(label = "consumer", defaultValue = "true")
protected boolean readLockMarkerFile = true;
+ @UriParam(label = "consumer", defaultValue = "true")
+ protected boolean readLockDeleteOrphanLockFiles = true;
@UriParam(label = "consumer", defaultValue = "WARN")
protected LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
@UriParam(label = "consumer", defaultValue = "1")
@@ -890,6 +892,21 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
this.readLockMarkerFile = readLockMarkerFile;
}
+ public boolean isReadLockDeleteOrphanLockFiles() {
+ return readLockDeleteOrphanLockFiles;
+ }
+
+ /**
+ * Whether or not read lock with marker files should upon startup delete any orphan read lock files, which may
+ * have been left on the file system, if Camel was not properly shutdown (such as a JVM crash).
+ * <p/>
+ * If turning this option to <tt>false</tt> then any orphaned lock file will cause Camel to not attempt to pickup
+ * that file, this could also be due another node is concurrently reading files from the same shared directory.
+ */
+ public void setReadLockDeleteOrphanLockFiles(boolean readLockDeleteOrphanLockFiles) {
+ this.readLockDeleteOrphanLockFiles = readLockDeleteOrphanLockFiles;
+ }
+
public LoggingLevel getReadLockLoggingLevel() {
return readLockLoggingLevel;
}
@@ -1272,6 +1289,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
params.put("readLockTimeout", readLockTimeout);
}
params.put("readLockMarkerFile", readLockMarkerFile);
+ params.put("readLockDeleteOrphanLockFiles", readLockDeleteOrphanLockFiles);
params.put("readLockMinLength", readLockMinLength);
params.put("readLockLoggingLevel", readLockLoggingLevel);
params.put("readLockMinAge", readLockMinAge);
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
index cf950d2..bf10ae8 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
@@ -127,4 +127,11 @@ public interface GenericFileExclusiveReadLockStrategy<T> {
*/
void setMarkerFiler(boolean markerFile);
+ /**
+ * Sets whether orphan marker files should be deleted upon startup
+ *
+ * @param deleteOrphanLockFiles <tt>true</tt> to delete files, <tt>false</tt> to skip this check
+ */
+ void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles);
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index b9cf193..78b7e48 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -116,6 +116,10 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp
// noop
}
+ public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+ // noop
+ }
+
public CamelContext getCamelContext() {
return camelContext;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index f9ceca2..f5348b0 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -112,7 +112,7 @@ public final class FileProcessStrategyFactory {
if ("none".equals(readLock) || "false".equals(readLock)) {
return null;
} else if ("markerFile".equals(readLock)) {
- return new MarkerFileExclusiveReadLockStrategy();
+ strategy = new MarkerFileExclusiveReadLockStrategy();
} else if ("fileLock".equals(readLock)) {
strategy = new FileLockExclusiveReadLockStrategy();
} else if ("rename".equals(readLock)) {
@@ -162,6 +162,10 @@ public final class FileProcessStrategyFactory {
if (readLockMarkerFile != null) {
strategy.setMarkerFiler(readLockMarkerFile);
}
+ Boolean readLockDeleteOrphanLockFiles = (Boolean) params.get("readLockDeleteOrphanLockFiles");
+ if (readLockDeleteOrphanLockFiles != null) {
+ strategy.setDeleteOrphanLockFiles(readLockDeleteOrphanLockFiles);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
index 062c7ee..976c965 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
@@ -136,4 +136,9 @@ public class GenericFileRenameExclusiveReadLockStrategy<T> implements GenericFil
public void setMarkerFiler(boolean markerFile) {
// noop - we do not use marker file with the rename strategy
}
+
+ @Override
+ public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+ // noop - we do not use marker file with the rename strategy
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
index 9792df0..ceabd01 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
@@ -38,20 +38,24 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
private static final Logger LOG = LoggerFactory.getLogger(MarkerFileExclusiveReadLockStrategy.class);
private boolean markerFile = true;
+ private boolean deleteOrphanLockFiles = true;
@Override
public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
- String dir = endpoint.getConfiguration().getDirectory();
- File file = new File(dir);
+ if (deleteOrphanLockFiles) {
- LOG.debug("Prepare on startup by deleting orphaned lock files from: {}", dir);
+ String dir = endpoint.getConfiguration().getDirectory();
+ File file = new File(dir);
- StopWatch watch = new StopWatch();
- deleteLockFiles(file, endpoint.isRecursive());
+ LOG.debug("Prepare on startup by deleting orphaned lock files from: {}", dir);
- // log anything that takes more than a second
- if (watch.taken() > 1000) {
- LOG.info("Prepared on startup by deleting orphaned lock files from: {} took {} millis to complete.", dir, watch.taken());
+ StopWatch watch = new StopWatch();
+ deleteLockFiles(file, endpoint.isRecursive());
+
+ // log anything that takes more than a second
+ if (watch.taken() > 1000) {
+ LOG.info("Prepared on startup by deleting orphaned lock files from: {} took {} millis to complete.", dir, watch.taken());
+ }
}
}
@@ -130,6 +134,11 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
this.markerFile = markerFile;
}
+ @Override
+ public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+ this.deleteOrphanLockFiles = deleteOrphanLockFiles;
+ }
+
private static void deleteLockFiles(File dir, boolean recursive) {
File[] files = dir.listFiles();
if (files == null || files.length == 0) {
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java
index 989af6a..8259ec6 100644
--- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBridgeRouteExceptionHandlerTest.java
@@ -131,6 +131,11 @@ public class FileConsumerBridgeRouteExceptionHandlerTest extends ContextTestSupp
// noop
}
+ @Override
+ public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+ // noop
+ }
+
public int getCounter() {
return counter;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java
index d33dd8d..071ac82 100644
--- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java
@@ -188,6 +188,11 @@ public class FileConsumerCustomExceptionHandlerTest extends ContextTestSupport {
// noop
}
+ @Override
+ public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+ // noop
+ }
+
public int getCounter() {
return counter;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileRecursiveDoNotDeleteOldLockFilesTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileRecursiveDoNotDeleteOldLockFilesTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileRecursiveDoNotDeleteOldLockFilesTest.java
new file mode 100644
index 0000000..067c19a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileRecursiveDoNotDeleteOldLockFilesTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class FileMarkerFileRecursiveDoNotDeleteOldLockFilesTest extends ContextTestSupport {
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("target/oldlock");
+ template.sendBodyAndHeader("file:target/oldlock", "locked", Exchange.FILE_NAME, "hello.txt" + FileComponent.DEFAULT_LOCK_FILE_POSTFIX);
+ template.sendBodyAndHeader("file:target/oldlock", "Hello World", Exchange.FILE_NAME, "hello.txt");
+ template.sendBodyAndHeader("file:target/oldlock/foo", "locked", Exchange.FILE_NAME, "gooday.txt" + FileComponent.DEFAULT_LOCK_FILE_POSTFIX);
+ template.sendBodyAndHeader("file:target/oldlock/foo", "Goodday World", Exchange.FILE_NAME, "gooday.txt");
+ // and a new file that has no lock
+ template.sendBodyAndHeader("file:target/oldlock", "New World", Exchange.FILE_NAME, "new.txt");
+ }
+
+ public void testDeleteOldLockOnStartup() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("New World");
+ mock.setResultMinimumWaitTime(1000);
+
+ // start the route
+ context.startRoute("foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("file:target/oldlock?readLockDeleteOrphanLockFiles=false&recursive=true").routeId("foo").noAutoStartup()
+ .convertBodyTo(String.class).to("log:result", "mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
index 5301465..a1db2bb 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
@@ -169,6 +169,11 @@ public class FtpChangedExclusiveReadLockStrategy implements GenericFileExclusive
// noop - not supported by ftp
}
+ @Override
+ public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+ // noop - not supported by ftp
+ }
+
public long getMinLength() {
return minLength;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/16444a7d/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
index 9b9fec1..5e21dd1 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
@@ -189,4 +189,9 @@ public class SftpChangedExclusiveReadLockStrategy implements GenericFileExclusiv
public void setMarkerFiler(boolean markerFiler) {
// noop - not supported by ftp
}
+
+ @Override
+ public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+ // noop - not supported by ftp
+ }
}
\ No newline at end of file