You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/16 21:55:06 UTC
[3/4] camel git commit: CAMEL-8954: File componet readlock state
should be per file so its possible to use pollEnrich to poll in a 2nd file
and keep those state separated. Thanks to Andy Fedotov for unit test.
CAMEL-8954: File componet readlock state should be per file so its possible to use pollEnrich to poll in a 2nd file and keep those state separated. Thanks to Andy Fedotov for unit test.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a6dc865
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a6dc865
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a6dc865
Branch: refs/heads/camel-2.15.x
Commit: 0a6dc865a6670b791dc46bcaf4a99f38f5b65c88
Parents: 6cb8770
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Jul 16 19:39:55 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jul 16 22:01:30 2015 +0200
----------------------------------------------------------------------
.../camel/component/file/GenericFile.java | 10 +++
.../FileLockExclusiveReadLockStrategy.java | 21 ++++--
.../MarkerFileExclusiveReadLockStrategy.java | 20 ++++--
...FileExclusiveReadLockStrategyUnlockTest.java | 73 ++++++++++++++++++++
4 files changed, 114 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0a6dc865/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java
index 343d836..907de21 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
public class GenericFile<T> implements WrappedFile<T> {
private static final Logger LOG = LoggerFactory.getLogger(GenericFile.class);
+ private String copyFromAbsoluteFilePath;
private String endpointPath;
private String fileName;
private String fileNameOnly;
@@ -66,6 +67,7 @@ public class GenericFile<T> implements WrappedFile<T> {
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
+ result.setCopyFromAbsoluteFilePath(source.getAbsoluteFilePath());
result.setEndpointPath(source.getEndpointPath());
result.setAbsolute(source.isAbsolute());
result.setDirectory(source.isDirectory());
@@ -365,6 +367,14 @@ public class GenericFile<T> implements WrappedFile<T> {
this.directory = directory;
}
+ public String getCopyFromAbsoluteFilePath() {
+ return copyFromAbsoluteFilePath;
+ }
+
+ public void setCopyFromAbsoluteFilePath(String copyFromAbsoluteFilePath) {
+ this.copyFromAbsoluteFilePath = copyFromAbsoluteFilePath;
+ }
+
/**
* Fixes the path separator to be according to the protocol
*/
http://git-wip-us.apache.org/repos/asf/camel/blob/0a6dc865/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
index ae5b0ff..7b5f1ce 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
@@ -127,10 +127,11 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo
}
}
- // we grabbed the lock
- exchange.setProperty(Exchange.FILE_LOCK_EXCLUSIVE_LOCK, lock);
- exchange.setProperty(Exchange.FILE_LOCK_RANDOM_ACCESS_FILE, randomAccessFile);
+ // store read-lock state
+ exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), lock);
+ exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_RANDOM_ACCESS_FILE), randomAccessFile);
+ // we grabbed the lock
return true;
}
@@ -141,10 +142,10 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo
// must call super
super.releaseExclusiveReadLock(operations, file, exchange);
- String target = file.getFileName();
- FileLock lock = exchange.getProperty(Exchange.FILE_LOCK_EXCLUSIVE_LOCK, FileLock.class);
- RandomAccessFile rac = exchange.getProperty(Exchange.FILE_LOCK_RANDOM_ACCESS_FILE, RandomAccessFile.class);
+ FileLock lock = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), FileLock.class);
+ RandomAccessFile rac = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), RandomAccessFile.class);
+ String target = file.getFileName();
if (lock != null) {
Channel channel = lock.acquiredBy();
try {
@@ -187,4 +188,12 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo
this.readLockLoggingLevel = readLockLoggingLevel;
}
+ private static String asReadLockKey(GenericFile file, String key) {
+ // use the copy from absolute path as that was the original path of the file when the lock was acquired
+ // for example if the file consumer uses preMove then the file is moved and therefore has another name
+ // that would no longer match
+ String path = file.getCopyFromAbsoluteFilePath() != null ? file.getCopyFromAbsoluteFilePath() : file.getAbsoluteFilePath();
+ return path + "-" + key;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a6dc865/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
index 6deebff..b9676f5 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
@@ -69,8 +69,10 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
// create a plain file as marker filer for locking (do not use FileLock)
boolean acquired = FileUtil.createNewFile(new File(lockFileName));
- exchange.setProperty(Exchange.FILE_LOCK_FILE_ACQUIRED, acquired);
- exchange.setProperty(Exchange.FILE_LOCK_FILE_NAME, lockFileName);
+
+ // store read-lock state
+ exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_ACQUIRED), acquired);
+ exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_NAME), lockFileName);
return acquired;
}
@@ -83,9 +85,11 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
return;
}
+ boolean acquired = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_ACQUIRED), false, Boolean.class);
+
// only release the file if camel get the lock before
- if (exchange.getProperty(Exchange.FILE_LOCK_FILE_ACQUIRED, false, Boolean.class)) {
- String lockFileName = exchange.getProperty(Exchange.FILE_LOCK_FILE_NAME, getLockFileName(file), String.class);
+ if (acquired) {
+ String lockFileName = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_NAME), String.class);
File lock = new File(lockFileName);
if (lock.exists()) {
@@ -139,4 +143,12 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
return file.getAbsoluteFilePath() + FileComponent.DEFAULT_LOCK_FILE_POSTFIX;
}
+ private static String asReadLockKey(GenericFile file, String key) {
+ // use the copy from absolute path as that was the original path of the file when the lock was acquired
+ // for example if the file consumer uses preMove then the file is moved and therefore has another name
+ // that would no longer match
+ String path = file.getCopyFromAbsoluteFilePath() != null ? file.getCopyFromAbsoluteFilePath() : file.getAbsoluteFilePath();
+ return path + "-" + key;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a6dc865/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java b/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java
new file mode 100644
index 0000000..b12b8b2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.FileOutputStream;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+
+public class MarkerFileExclusiveReadLockStrategyUnlockTest extends ContextTestSupport {
+
+ @Override
+ protected void setUp() throws Exception {
+ setupDirectory();
+ super.setUp();
+ }
+
+ public void testUnlocking() throws Exception {
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+ writeFiles();
+ boolean done = notify.matches(5, TimeUnit.SECONDS);
+
+ assertTrue("Route should be done processing 1 exchanges", done);
+
+ assertFileNotExists("target/marker-unlock/input-a/file1.dat.camelLock");
+ assertFileNotExists("target/marker-unlock/input-b/file2.dat.camelLock");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("file:target/marker-unlock/input-a?fileName=file1.dat&readLock=markerFile")
+ .pollEnrich("file:target/marker-unlock/input-b?fileName=file2.dat&readLock=markerFile")
+ .to("mock:result");
+ }
+ };
+ }
+
+ private void setupDirectory() {
+ deleteDirectory("target/marker-unlock/");
+ createDirectory("target/marker-unlock/input-a");
+ createDirectory("target/marker-unlock/input-b");
+ }
+
+ private void writeFiles() throws Exception {
+ FileOutputStream fos1 = new FileOutputStream("target/marker-unlock/input-a/file1.dat");
+ FileOutputStream fos2 = new FileOutputStream("target/marker-unlock/input-b/file2.dat");
+ fos1.write("File-1".getBytes());
+ fos2.write("File-2".getBytes());
+ fos1.flush();
+ fos1.close();
+ fos2.flush();
+ fos2.close();
+ }
+}