You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/03/27 12:25:11 UTC
svn commit: r1085905 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/file/
main/java/org/apache/camel/component/file/strategy/
test/java/org/apache/camel/component/file/strategy/
Author: davsclaus
Date: Sun Mar 27 10:25:11 2011
New Revision: 1085905
URL: http://svn.apache.org/viewvc?rev=1085905&view=rev
Log:
CAMEL-3789: Fixed marker file strategy not being thread safe. Thanks to Maria Iracheta for the patch.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java?rev=1085905&r1=1085904&r2=1085905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java Sun Mar 27 10:25:11 2011
@@ -54,7 +54,7 @@ public class FileOperations implements G
this.endpoint = (FileEndpoint) endpoint;
}
- public boolean deleteFile(String name) throws GenericFileOperationFailedException {
+ public boolean deleteFile(String name) throws GenericFileOperationFailedException {
File file = new File(name);
return FileUtil.deleteFile(file);
}
@@ -71,7 +71,7 @@ public class FileOperations implements G
}
public boolean buildDirectory(String directory, boolean absolute) throws GenericFileOperationFailedException {
- ObjectHelper.notNull(endpoint, "endpoint");
+ ObjectHelper.notNull(endpoint, "endpoint");
// always create endpoint defined directory
if (endpoint.isAutoCreate() && !endpoint.getFile().exists()) {
@@ -106,12 +106,17 @@ public class FileOperations implements G
}
}
- if (path.isDirectory() && path.exists()) {
- // the directory already exists
- return true;
- } else {
- LOG.trace("Building directory: {}", path);
- return path.mkdirs();
+ // We need to make sure that this is thread-safe and only one thread tries to create the path directory at the same time.
+ synchronized (this) {
+ if (path.isDirectory() && path.exists()) {
+ // the directory already exists
+ return true;
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Building directory: " + path);
+ }
+ return path.mkdirs();
+ }
}
}
@@ -152,7 +157,9 @@ public class FileOperations implements G
if (file.exists()) {
if (endpoint.getFileExist() == GenericFileExist.Ignore) {
// ignore but indicate that the file was written
- LOG.trace("An existing file already exists: {}. Ignore and do not override it.", file);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("An existing file already exists: " + file + ". Ignore and do not override it.");
+ }
return true;
} else if (endpoint.getFileExist() == GenericFileExist.Fail) {
throw new GenericFileOperationFailedException("File already exist: " + file + ". Cannot write new file.");
@@ -168,7 +175,7 @@ public class FileOperations implements G
// is the body file based
File source = null;
// get the File Object from in message
- source = exchange.getIn().getBody(File.class);
+ source = exchange.getIn().getBody(File.class);
if (source != null) {
// okay we know the body is a file type
@@ -222,13 +229,17 @@ public class FileOperations implements G
}
if (last != null) {
boolean result = file.setLastModified(last);
- LOG.trace("Keeping last modified timestamp: {} on file: {} with result: {}", new Object[]{last, file, result});
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Keeping last modified timestamp: " + last + " on file: " + file + " with result: " + result);
+ }
}
}
}
private boolean writeFileByLocalWorkPath(File source, File file) {
- LOG.trace("Using local work file being renamed from: {} to: {}", source, file);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Using local work file being renamed from: " + source + " to: " + file);
+ }
return FileUtil.renameFile(source, file);
}
@@ -239,7 +250,9 @@ public class FileOperations implements G
try {
out = prepareOutputFileChannel(target, out);
- LOG.trace("Using FileChannel to transfer from: {} to: {}", in, out);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Using FileChannel to transfer from: " + in + " to: " + out);
+ }
long size = in.size();
long position = 0;
@@ -257,7 +270,9 @@ public class FileOperations implements G
try {
out = prepareOutputFileChannel(target, out);
- LOG.trace("Using InputStream to transfer from: {} to: {}", in, out);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Using InputStream to transfer from: " + in + " to: " + out);
+ }
int size = endpoint.getBufferSize();
byte[] buffer = new byte[size];
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
@@ -277,7 +292,7 @@ public class FileOperations implements G
/**
* Creates and prepares the output file channel. Will position itself in correct position if the file is writable
- * eg. it should append or override any existing content.
+ * eg. it should append or override any existing content.
*/
private FileChannel prepareOutputFileChannel(File target, FileChannel out) throws IOException {
if (endpoint.getFileExist() == GenericFileExist.Append) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java?rev=1085905&r1=1085904&r2=1085905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java Sun Mar 27 10:25:11 2011
@@ -29,19 +29,22 @@ public class GenericFileDeleteProcessStr
@Override
public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
- // must invoke super
- boolean result = super.begin(operations, endpoint, exchange, file);
- if (!result) {
- return false;
- }
+
+ // We need to invoke super, but to the file that we are going to use for processing, so we do super after renaming.
+ GenericFile<T> to = file;
if (beginRenamer != null) {
GenericFile<T> newName = beginRenamer.renameFile(exchange, file);
- GenericFile<T> to = renameFile(operations, file, newName);
+ to = renameFile(operations, file, newName);
if (to != null) {
to.bindToExchange(exchange);
}
}
+ // must invoke super
+ boolean result = super.begin(operations, endpoint, exchange, to);
+ if (!result) {
+ return false;
+ }
return true;
}
@@ -79,7 +82,7 @@ public class GenericFileDeleteProcessStr
throw new GenericFileOperationFailedException("Cannot delete file: " + file);
}
}
-
+
@Override
public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
// must invoke super
@@ -98,7 +101,7 @@ public class GenericFileDeleteProcessStr
renameFile(operations, file, newName);
}
}
-
+
public GenericFileRenamer<T> getFailureRenamer() {
return failureRenamer;
}
@@ -114,4 +117,4 @@ public class GenericFileDeleteProcessStr
public void setBeginRenamer(GenericFileRenamer<T> beginRenamer) {
this.beginRenamer = beginRenamer;
}
-}
\ No newline at end of file
+}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java?rev=1085905&r1=1085904&r2=1085905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java Sun Mar 27 10:25:11 2011
@@ -31,19 +31,22 @@ public class GenericFileRenameProcessStr
@Override
public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
- // must invoke super
- boolean result = super.begin(operations, endpoint, exchange, file);
- if (!result) {
- return false;
- }
+
+ // We need to invoke super, but to the file that we are going to use for processing, so we do super after renaming.
+ GenericFile<T> to = file;
if (beginRenamer != null) {
GenericFile<T> newName = beginRenamer.renameFile(exchange, file);
- GenericFile<T> to = renameFile(operations, file, newName);
+ to = renameFile(operations, file, newName);
if (to != null) {
to.bindToExchange(exchange);
}
}
+ // must invoke super
+ boolean result = super.begin(operations, endpoint, exchange, to);
+ if (!result) {
+ return false;
+ }
return true;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java?rev=1085905&r1=1085904&r2=1085905&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java Sun Mar 27 10:25:11 2011
@@ -34,8 +34,6 @@ import org.slf4j.LoggerFactory;
*/
public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusiveReadLockStrategy<File> {
private static final transient Logger LOG = LoggerFactory.getLogger(MarkerFileExclusiveReadLockStrategy.class);
- private File lock;
- private String lockFileName;
public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
String dir = endpoint.getConfiguration().getDirectory();
@@ -50,28 +48,25 @@ public class MarkerFileExclusiveReadLock
public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
- lockFileName = file.getAbsoluteFilePath() + FileComponent.DEFAULT_LOCK_FILE_POSTFIX;
+ String lockFileName = getLockFileName(file);
LOG.trace("Locking the file: {} using the lock file name: {}", file, lockFileName);
// create a plain file as marker filer for locking (do not use FileLock)
- lock = new File(lockFileName);
+ File lock = new File(lockFileName);
boolean acquired = lock.createNewFile();
- if (!acquired) {
- lock = null;
-
- }
return acquired;
}
public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
- if (lock != null) {
- LOG.trace("Unlocking file: {}", lockFileName);
+ String lockFileName = getLockFileName(file);
+ File lock = new File(lockFileName);
- boolean deleted = FileUtil.deleteFile(lock);
- LOG.trace("Lock file: {} was deleted: {}", lockFileName, deleted);
- }
+ LOG.trace("Unlocking file: {}", lockFileName);
+
+ boolean deleted = FileUtil.deleteFile(lock);
+ LOG.trace("Lock file: {} was deleted: {}", lockFileName, deleted);
}
public void setTimeout(long timeout) {
@@ -101,4 +96,8 @@ public class MarkerFileExclusiveReadLock
}
}
+ private static String getLockFileName(GenericFile<File> file) {
+ return file.getAbsoluteFilePath() + FileComponent.DEFAULT_LOCK_FILE_POSTFIX;
+ }
+
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java?rev=1085905&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java Sun Mar 27 10:25:11 2011
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Tests the MarkerFileExclusiveReadLockStrategy in a multi-threaded scenario.
+ */
+public class MarkerFileExclusiveReadLockStrategyTest extends ContextTestSupport {
+
+ private static final transient Log LOG = LogFactory.getLog(MarkerFileExclusiveReadLockStrategyTest.class);
+ private static int NUMBER_OF_THREADS = 5;
+ private AtomicInteger numberOfFilesProcessed = new AtomicInteger(0);
+
+ public void testMultithreadedLocking() throws Exception {
+ deleteDirectory("target/marker/");
+ createDirectory("target/marker/in");
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(2);
+ mock.expectedFileExists("target/marker/out/file1.dat");
+ mock.expectedFileExists("target/marker/out/file2.dat");
+
+ writeFiles();
+
+ assertMockEndpointsSatisfied();
+
+ String content = context.getTypeConverter().convertTo(String.class, new File("target/marker/out/file1.dat").getAbsoluteFile());
+ String[] lines = content.split("\n");
+ for (int i = 0; i < 20; i++) {
+ assertEquals("Line " + i, lines[i]);
+ }
+
+ content = context.getTypeConverter().convertTo(String.class, new File("target/marker/out/file2.dat").getAbsoluteFile());
+ lines = content.split("\n");
+ for (int i = 0; i < 20; i++) {
+ assertEquals("Line " + i, lines[i]);
+ }
+
+ waitUntilCompleted();
+
+ assertFileDoesNotExists("target/marker/in/file1.dat.camelLock");
+ assertFileDoesNotExists("target/marker/in/file2.dat.camelLock");
+
+ assertFileDoesNotExists("target/marker/in/file1.dat");
+ assertFileDoesNotExists("target/marker/in/file2.dat");
+
+ assertEquals(2, this.numberOfFilesProcessed.get());
+ }
+
+ private void writeFiles() throws Exception {
+ LOG.debug("Writing files...");
+
+ FileOutputStream fos = new FileOutputStream("target/marker/in/file1.dat");
+ FileOutputStream fos2 = new FileOutputStream("target/marker/in/file2.dat");
+ for (int i = 0; i < 20; i++) {
+ fos.write(("Line " + i + "\n").getBytes());
+ fos2.write(("Line " + i + "\n").getBytes());
+ LOG.debug("Writing line " + i);
+ }
+
+ fos.flush();
+ fos.close();
+ fos2.flush();
+ fos2.close();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("file:target/marker/in?readLock=markerFile")
+ .onCompletion()
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ numberOfFilesProcessed.addAndGet(1);
+ }
+ })
+ .end()
+ .threads(NUMBER_OF_THREADS)
+ .to("file:target/marker/out", "mock:result");
+ }
+ };
+ }
+
+ private void waitUntilCompleted() {
+ while (this.numberOfFilesProcessed.get() < 2) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ private static void assertFileDoesNotExists(String filename) {
+ File file = new File(filename).getAbsoluteFile();
+ assertFalse("File " + filename + " should not exist, it should have been deleted after being processed", file.exists());
+ }
+
+}