You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/03/27 12:25:11 UTC

svn commit: r1085905 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/ main/java/org/apache/camel/component/file/strategy/ test/java/org/apache/camel/component/file/strategy/

Author: davsclaus
Date: Sun Mar 27 10:25:11 2011
New Revision: 1085905

URL: http://svn.apache.org/viewvc?rev=1085905&view=rev
Log:
CAMEL-3789: Fixed marker file strategy not being thread safe. Thanks to Maria Iracheta for the patch.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java?rev=1085905&r1=1085904&r2=1085905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java Sun Mar 27 10:25:11 2011
@@ -54,7 +54,7 @@ public class FileOperations implements G
         this.endpoint = (FileEndpoint) endpoint;
     }
 
-    public boolean deleteFile(String name) throws GenericFileOperationFailedException {        
+    public boolean deleteFile(String name) throws GenericFileOperationFailedException {
         File file = new File(name);
         return FileUtil.deleteFile(file);
     }
@@ -71,7 +71,7 @@ public class FileOperations implements G
     }
 
     public boolean buildDirectory(String directory, boolean absolute) throws GenericFileOperationFailedException {
-        ObjectHelper.notNull(endpoint, "endpoint");       
+        ObjectHelper.notNull(endpoint, "endpoint");
 
         // always create endpoint defined directory
         if (endpoint.isAutoCreate() && !endpoint.getFile().exists()) {
@@ -106,12 +106,17 @@ public class FileOperations implements G
             }
         }
 
-        if (path.isDirectory() && path.exists()) {
-            // the directory already exists
-            return true;
-        } else {
-            LOG.trace("Building directory: {}", path);
-            return path.mkdirs();
+        // We need to make sure that this is thread-safe and only one thread tries to create the path directory at the same time.
+        synchronized (this) {
+            if (path.isDirectory() && path.exists()) {
+                // the directory already exists
+                return true;
+            } else {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Building directory: " + path);
+                }
+                return path.mkdirs();
+            }
         }
     }
 
@@ -152,7 +157,9 @@ public class FileOperations implements G
         if (file.exists()) {
             if (endpoint.getFileExist() == GenericFileExist.Ignore) {
                 // ignore but indicate that the file was written
-                LOG.trace("An existing file already exists: {}. Ignore and do not override it.", file);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("An existing file already exists: " + file + ". Ignore and do not override it.");
+                }
                 return true;
             } else if (endpoint.getFileExist() == GenericFileExist.Fail) {
                 throw new GenericFileOperationFailedException("File already exist: " + file + ". Cannot write new file.");
@@ -168,7 +175,7 @@ public class FileOperations implements G
             // is the body file based
             File source = null;
             // get the File Object from in message
-            source = exchange.getIn().getBody(File.class);            
+            source = exchange.getIn().getBody(File.class);
 
             if (source != null) {
                 // okay we know the body is a file type
@@ -222,13 +229,17 @@ public class FileOperations implements G
             }
             if (last != null) {
                 boolean result = file.setLastModified(last);
-                LOG.trace("Keeping last modified timestamp: {} on file: {} with result: {}", new Object[]{last, file, result});
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Keeping last modified timestamp: " + last + " on file: " + file + " with result: " + result);
+                }
             }
         }
     }
 
     private boolean writeFileByLocalWorkPath(File source, File file) {
-        LOG.trace("Using local work file being renamed from: {} to: {}", source, file);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Using local work file being renamed from: " + source + " to: " + file);
+        }
 
         return FileUtil.renameFile(source, file);
     }
@@ -239,7 +250,9 @@ public class FileOperations implements G
         try {
             out = prepareOutputFileChannel(target, out);
 
-            LOG.trace("Using FileChannel to transfer from: {} to: {}", in, out);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Using FileChannel to transfer from: " + in + " to: " + out);
+            }
 
             long size = in.size();
             long position = 0;
@@ -257,7 +270,9 @@ public class FileOperations implements G
         try {
             out = prepareOutputFileChannel(target, out);
 
-            LOG.trace("Using InputStream to transfer from: {} to: {}", in, out);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Using InputStream to transfer from: " + in + " to: " + out);
+            }
             int size = endpoint.getBufferSize();
             byte[] buffer = new byte[size];
             ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
@@ -277,7 +292,7 @@ public class FileOperations implements G
 
     /**
      * Creates and prepares the output file channel. Will position itself in correct position if the file is writable
-     *  eg. it should append or override any existing content.
+     * eg. it should append or override any existing content.
      */
     private FileChannel prepareOutputFileChannel(File target, FileChannel out) throws IOException {
         if (endpoint.getFileExist() == GenericFileExist.Append) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java?rev=1085905&r1=1085904&r2=1085905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java Sun Mar 27 10:25:11 2011
@@ -29,19 +29,22 @@ public class GenericFileDeleteProcessStr
 
     @Override
     public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
-        // must invoke super
-        boolean result = super.begin(operations, endpoint, exchange, file);
-        if (!result) {
-            return false;
-        }
+
+        // We need to invoke super, but to the file that we are going to use for processing, so we do super after renaming.
+        GenericFile<T> to = file;
 
         if (beginRenamer != null) {
             GenericFile<T> newName = beginRenamer.renameFile(exchange, file);
-            GenericFile<T> to = renameFile(operations, file, newName);
+            to = renameFile(operations, file, newName);
             if (to != null) {
                 to.bindToExchange(exchange);
             }
         }
+        // must invoke super
+        boolean result = super.begin(operations, endpoint, exchange, to);
+        if (!result) {
+            return false;
+        }
 
         return true;
     }
@@ -79,7 +82,7 @@ public class GenericFileDeleteProcessStr
             throw new GenericFileOperationFailedException("Cannot delete file: " + file);
         }
     }
-    
+
     @Override
     public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
         // must invoke super
@@ -98,7 +101,7 @@ public class GenericFileDeleteProcessStr
             renameFile(operations, file, newName);
         }
     }
-    
+
     public GenericFileRenamer<T> getFailureRenamer() {
         return failureRenamer;
     }
@@ -114,4 +117,4 @@ public class GenericFileDeleteProcessStr
     public void setBeginRenamer(GenericFileRenamer<T> beginRenamer) {
         this.beginRenamer = beginRenamer;
     }
-}
\ No newline at end of file
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java?rev=1085905&r1=1085904&r2=1085905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java Sun Mar 27 10:25:11 2011
@@ -31,19 +31,22 @@ public class GenericFileRenameProcessStr
 
     @Override
     public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
-        // must invoke super
-        boolean result = super.begin(operations, endpoint, exchange, file);
-        if (!result) {
-            return false;
-        }
+
+        // We need to invoke super, but to the file that we are going to use for processing, so we do super after renaming.
+        GenericFile<T> to = file;
 
         if (beginRenamer != null) {
             GenericFile<T> newName = beginRenamer.renameFile(exchange, file);
-            GenericFile<T> to = renameFile(operations, file, newName);
+            to = renameFile(operations, file, newName);
             if (to != null) {
                 to.bindToExchange(exchange);
             }
         }
+        // must invoke super
+        boolean result = super.begin(operations, endpoint, exchange, to);
+        if (!result) {
+            return false;
+        }
 
         return true;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java?rev=1085905&r1=1085904&r2=1085905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java Sun Mar 27 10:25:11 2011
@@ -34,8 +34,6 @@ import org.slf4j.LoggerFactory;
  */
 public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusiveReadLockStrategy<File> {
     private static final transient Logger LOG = LoggerFactory.getLogger(MarkerFileExclusiveReadLockStrategy.class);
-    private File lock;
-    private String lockFileName;
 
     public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
         String dir = endpoint.getConfiguration().getDirectory();
@@ -50,28 +48,25 @@ public class MarkerFileExclusiveReadLock
 
     public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations,
                                             GenericFile<File> file, Exchange exchange) throws Exception {
-        lockFileName = file.getAbsoluteFilePath() + FileComponent.DEFAULT_LOCK_FILE_POSTFIX;
+        String lockFileName = getLockFileName(file);
         LOG.trace("Locking the file: {} using the lock file name: {}", file, lockFileName);
 
         // create a plain file as marker filer for locking (do not use FileLock)
-        lock = new File(lockFileName);
+        File lock = new File(lockFileName);
         boolean acquired = lock.createNewFile();
-        if (!acquired) {
-            lock = null;
-
-        }
 
         return acquired;
     }
 
     public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
                                          GenericFile<File> file, Exchange exchange) throws Exception {
-        if (lock != null) {
-            LOG.trace("Unlocking file: {}", lockFileName);
+        String lockFileName = getLockFileName(file);
+        File lock = new File(lockFileName);
 
-            boolean deleted = FileUtil.deleteFile(lock);
-            LOG.trace("Lock file: {} was deleted: {}", lockFileName, deleted);
-        }
+        LOG.trace("Unlocking file: {}", lockFileName);
+
+        boolean deleted = FileUtil.deleteFile(lock);
+        LOG.trace("Lock file: {} was deleted: {}", lockFileName, deleted);
     }
 
     public void setTimeout(long timeout) {
@@ -101,4 +96,8 @@ public class MarkerFileExclusiveReadLock
         }
     }
 
+    private static String getLockFileName(GenericFile<File> file) {
+        return file.getAbsoluteFilePath() + FileComponent.DEFAULT_LOCK_FILE_POSTFIX;
+    }
+
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java?rev=1085905&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java Sun Mar 27 10:25:11 2011
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Tests the MarkerFileExclusiveReadLockStrategy in a multi-threaded scenario.
+ */
+public class MarkerFileExclusiveReadLockStrategyTest extends ContextTestSupport {
+
+    private static final transient Log LOG = LogFactory.getLog(MarkerFileExclusiveReadLockStrategyTest.class);
+    private static int NUMBER_OF_THREADS = 5;
+    private AtomicInteger numberOfFilesProcessed = new AtomicInteger(0);
+
+    public void testMultithreadedLocking() throws Exception {
+        deleteDirectory("target/marker/");
+        createDirectory("target/marker/in");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+        mock.expectedFileExists("target/marker/out/file1.dat");
+        mock.expectedFileExists("target/marker/out/file2.dat");
+
+        writeFiles();
+
+        assertMockEndpointsSatisfied();
+
+        String content = context.getTypeConverter().convertTo(String.class, new File("target/marker/out/file1.dat").getAbsoluteFile());
+        String[] lines = content.split("\n");
+        for (int i = 0; i < 20; i++) {
+            assertEquals("Line " + i, lines[i]);
+        }
+
+        content = context.getTypeConverter().convertTo(String.class, new File("target/marker/out/file2.dat").getAbsoluteFile());
+        lines = content.split("\n");
+        for (int i = 0; i < 20; i++) {
+            assertEquals("Line " + i, lines[i]);
+        }
+
+        waitUntilCompleted();
+
+        assertFileDoesNotExists("target/marker/in/file1.dat.camelLock");
+        assertFileDoesNotExists("target/marker/in/file2.dat.camelLock");
+
+        assertFileDoesNotExists("target/marker/in/file1.dat");
+        assertFileDoesNotExists("target/marker/in/file2.dat");
+
+        assertEquals(2, this.numberOfFilesProcessed.get());
+    }
+
+    private void writeFiles() throws Exception {
+        LOG.debug("Writing files...");
+
+        FileOutputStream fos = new FileOutputStream("target/marker/in/file1.dat");
+        FileOutputStream fos2 = new FileOutputStream("target/marker/in/file2.dat");
+        for (int i = 0; i < 20; i++) {
+            fos.write(("Line " + i + "\n").getBytes());
+            fos2.write(("Line " + i + "\n").getBytes());
+            LOG.debug("Writing line " + i);
+        }
+
+        fos.flush();
+        fos.close();
+        fos2.flush();
+        fos2.close();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/marker/in?readLock=markerFile")
+                        .onCompletion()
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                numberOfFilesProcessed.addAndGet(1);
+                            }
+                        })
+                        .end()
+                        .threads(NUMBER_OF_THREADS)
+                        .to("file:target/marker/out", "mock:result");
+            }
+        };
+    }
+
+    private void waitUntilCompleted() {
+        while (this.numberOfFilesProcessed.get() < 2) {
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    private static void assertFileDoesNotExists(String filename) {
+        File file = new File(filename).getAbsoluteFile();
+        assertFalse("File " + filename + " should not exist, it should have been deleted after being processed", file.exists());
+    }
+
+}