You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/16 21:55:06 UTC

[3/4] camel git commit: CAMEL-8954: File componet readlock state should be per file so its possible to use pollEnrich to poll in a 2nd file and keep those state separated. Thanks to Andy Fedotov for unit test.

CAMEL-8954: File componet readlock state should be per file so its possible to use pollEnrich to poll in a 2nd file and keep those state separated. Thanks to Andy Fedotov for unit test.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a6dc865
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a6dc865
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a6dc865

Branch: refs/heads/camel-2.15.x
Commit: 0a6dc865a6670b791dc46bcaf4a99f38f5b65c88
Parents: 6cb8770
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Jul 16 19:39:55 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jul 16 22:01:30 2015 +0200

----------------------------------------------------------------------
 .../camel/component/file/GenericFile.java       | 10 +++
 .../FileLockExclusiveReadLockStrategy.java      | 21 ++++--
 .../MarkerFileExclusiveReadLockStrategy.java    | 20 ++++--
 ...FileExclusiveReadLockStrategyUnlockTest.java | 73 ++++++++++++++++++++
 4 files changed, 114 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0a6dc865/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java
index 343d836..907de21 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFile.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 public class GenericFile<T> implements WrappedFile<T>  {
     private static final Logger LOG = LoggerFactory.getLogger(GenericFile.class);
 
+    private String copyFromAbsoluteFilePath;
     private String endpointPath;
     private String fileName;
     private String fileNameOnly;
@@ -66,6 +67,7 @@ public class GenericFile<T> implements WrappedFile<T>  {
         } catch (Exception e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);
         }
+        result.setCopyFromAbsoluteFilePath(source.getAbsoluteFilePath());
         result.setEndpointPath(source.getEndpointPath());
         result.setAbsolute(source.isAbsolute());
         result.setDirectory(source.isDirectory());
@@ -365,6 +367,14 @@ public class GenericFile<T> implements WrappedFile<T>  {
         this.directory = directory;
     }
 
+    public String getCopyFromAbsoluteFilePath() {
+        return copyFromAbsoluteFilePath;
+    }
+
+    public void setCopyFromAbsoluteFilePath(String copyFromAbsoluteFilePath) {
+        this.copyFromAbsoluteFilePath = copyFromAbsoluteFilePath;
+    }
+
     /**
      * Fixes the path separator to be according to the protocol
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/0a6dc865/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
index ae5b0ff..7b5f1ce 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
@@ -127,10 +127,11 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo
             }
         }
 
-        // we grabbed the lock
-        exchange.setProperty(Exchange.FILE_LOCK_EXCLUSIVE_LOCK, lock);
-        exchange.setProperty(Exchange.FILE_LOCK_RANDOM_ACCESS_FILE, randomAccessFile);
+        // store read-lock state
+        exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), lock);
+        exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_RANDOM_ACCESS_FILE), randomAccessFile);
 
+        // we grabbed the lock
         return true;
     }
 
@@ -141,10 +142,10 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo
         // must call super
         super.releaseExclusiveReadLock(operations, file, exchange);
 
-        String target = file.getFileName();
-        FileLock lock = exchange.getProperty(Exchange.FILE_LOCK_EXCLUSIVE_LOCK, FileLock.class);
-        RandomAccessFile rac = exchange.getProperty(Exchange.FILE_LOCK_RANDOM_ACCESS_FILE, RandomAccessFile.class);
+        FileLock lock = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), FileLock.class);
+        RandomAccessFile rac = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), RandomAccessFile.class);
 
+        String target = file.getFileName();
         if (lock != null) {
             Channel channel = lock.acquiredBy();
             try {
@@ -187,4 +188,12 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo
         this.readLockLoggingLevel = readLockLoggingLevel;
     }
 
+    private static String asReadLockKey(GenericFile file, String key) {
+        // use the copy from absolute path as that was the original path of the file when the lock was acquired
+        // for example if the file consumer uses preMove then the file is moved and therefore has another name
+        // that would no longer match
+        String path = file.getCopyFromAbsoluteFilePath() != null ? file.getCopyFromAbsoluteFilePath() : file.getAbsoluteFilePath();
+        return path + "-" + key;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0a6dc865/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
index 6deebff..b9676f5 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
@@ -69,8 +69,10 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
 
         // create a plain file as marker filer for locking (do not use FileLock)
         boolean acquired = FileUtil.createNewFile(new File(lockFileName));
-        exchange.setProperty(Exchange.FILE_LOCK_FILE_ACQUIRED, acquired);
-        exchange.setProperty(Exchange.FILE_LOCK_FILE_NAME, lockFileName);
+
+        // store read-lock state
+        exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_ACQUIRED), acquired);
+        exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_NAME), lockFileName);
 
         return acquired;
     }
@@ -83,9 +85,11 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
             return;
         }
 
+        boolean acquired = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_ACQUIRED), false, Boolean.class);
+
         // only release the file if camel get the lock before
-        if (exchange.getProperty(Exchange.FILE_LOCK_FILE_ACQUIRED, false, Boolean.class)) {
-            String lockFileName = exchange.getProperty(Exchange.FILE_LOCK_FILE_NAME, getLockFileName(file), String.class);
+        if (acquired) {
+            String lockFileName = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_NAME), String.class);
             File lock = new File(lockFileName);
 
             if (lock.exists()) {
@@ -139,4 +143,12 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive
         return file.getAbsoluteFilePath() + FileComponent.DEFAULT_LOCK_FILE_POSTFIX;
     }
 
+    private static String asReadLockKey(GenericFile file, String key) {
+        // use the copy from absolute path as that was the original path of the file when the lock was acquired
+        // for example if the file consumer uses preMove then the file is moved and therefore has another name
+        // that would no longer match
+        String path = file.getCopyFromAbsoluteFilePath() != null ? file.getCopyFromAbsoluteFilePath() : file.getAbsoluteFilePath();
+        return path + "-" + key;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0a6dc865/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java b/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java
new file mode 100644
index 0000000..b12b8b2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.FileOutputStream;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+
+public class MarkerFileExclusiveReadLockStrategyUnlockTest extends ContextTestSupport {
+
+    @Override
+    protected void setUp() throws Exception {
+        setupDirectory();
+        super.setUp();
+    }
+
+    public void testUnlocking() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+        writeFiles();
+        boolean done = notify.matches(5, TimeUnit.SECONDS);
+
+        assertTrue("Route should be done processing 1 exchanges", done);
+
+        assertFileNotExists("target/marker-unlock/input-a/file1.dat.camelLock");
+        assertFileNotExists("target/marker-unlock/input-b/file2.dat.camelLock");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/marker-unlock/input-a?fileName=file1.dat&readLock=markerFile")
+                        .pollEnrich("file:target/marker-unlock/input-b?fileName=file2.dat&readLock=markerFile")
+                        .to("mock:result");
+            }
+        };
+    }
+
+    private void setupDirectory() {
+        deleteDirectory("target/marker-unlock/");
+        createDirectory("target/marker-unlock/input-a");
+        createDirectory("target/marker-unlock/input-b");
+    }
+
+    private void writeFiles() throws Exception {
+        FileOutputStream fos1 = new FileOutputStream("target/marker-unlock/input-a/file1.dat");
+        FileOutputStream fos2 = new FileOutputStream("target/marker-unlock/input-b/file2.dat");
+        fos1.write("File-1".getBytes());
+        fos2.write("File-2".getBytes());
+        fos1.flush();
+        fos1.close();
+        fos2.flush();
+        fos2.close();
+    }
+}