You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/02/08 16:35:11 UTC
svn commit: r1068443 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/file/
main/java/org/apache/camel/util/concurrent/
test/java/org/apache/camel/component/file/
Author: davsclaus
Date: Tue Feb 8 15:35:11 2011
New Revision: 1068443
URL: http://svn.apache.org/viewvc?rev=1068443&view=rev
Log:
CAMEL-3584: Fixed concurrent writing the same file using file producer should be synchronized.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java?rev=1068443&r1=1068442&r2=1068443&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java Tue Feb 8 15:35:11 2011
@@ -17,17 +17,20 @@
package org.apache.camel.component.file;
import java.io.File;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
-import org.apache.camel.ExpressionIllegalSyntaxException;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.language.simple.SimpleLanguage;
import org.apache.camel.spi.Language;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.LRUCache;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +41,9 @@ public class GenericFileProducer<T> exte
protected final transient Logger log = LoggerFactory.getLogger(getClass());
protected final GenericFileEndpoint<T> endpoint;
protected GenericFileOperations<T> operations;
-
+ // assume writing to 100 different files concurrently at most for the same file producer
+ private final LRUCache<String, Lock> locks = new LRUCache<String, Lock>(100);
+
protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) {
super(endpoint);
this.endpoint = endpoint;
@@ -56,8 +61,29 @@ public class GenericFileProducer<T> exte
public void process(Exchange exchange) throws Exception {
Exchange fileExchange = endpoint.createExchange(exchange);
endpoint.configureExchange(fileExchange);
- processExchange(fileExchange);
- ExchangeHelper.copyResults(exchange, fileExchange);
+
+ String target = createFileName(exchange);
+
+ // use lock for same file name to avoid concurrent writes to the same file
+ // for example when you concurrently append to the same file
+ Lock lock;
+ synchronized (locks) {
+ lock = locks.get(target);
+ if (lock == null) {
+ lock = new ReentrantLock();
+ locks.put(target, lock);
+ }
+ }
+
+ lock.lock();
+ try {
+ processExchange(fileExchange, target);
+ ExchangeHelper.copyResults(exchange, fileExchange);
+ } finally {
+ // do not remove as the locks cache has an upper bound
+ // this ensure the locks is appropriate reused
+ lock.unlock();
+ }
}
/**
@@ -75,16 +101,15 @@ public class GenericFileProducer<T> exte
* Perform the work to process the fileExchange
*
* @param exchange fileExchange
+ * @param target the target filename
* @throws Exception is thrown if some error
*/
- protected void processExchange(Exchange exchange) throws Exception {
+ protected void processExchange(Exchange exchange, String target) throws Exception {
if (log.isTraceEnabled()) {
- log.trace("Processing " + exchange);
+ log.trace("Processing file: " + target + " for exchange: " + exchange);
}
try {
- String target = createFileName(exchange);
-
preWriteCheck();
// should we write to a temporary name and then afterwards rename to real target
@@ -335,4 +360,15 @@ public class GenericFileProducer<T> exte
}
}
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ ServiceHelper.startService(locks);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(locks);
+ super.doStop();
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1068443&r1=1068442&r2=1068443&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Tue Feb 8 15:35:11 2011
@@ -360,6 +360,10 @@ public final class ExecutorServiceHelper
}
return answer;
}
+
+ public String toString() {
+ return "CamelThreadFactory[" + name + "]";
+ }
}
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java?rev=1068443&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java Tue Feb 8 15:35:11 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class FileConcurrentWriteAppendSameFileTest extends ContextTestSupport {
+
+ private final int size = 5000;
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/concurrent");
+ super.setUp();
+ }
+
+ public void testConcurrentAppend() throws Exception {
+ // create file with many lines
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ sb.append("Line " + i + "\n");
+ }
+
+ template.sendBodyAndHeader("file:target/concurrent", sb.toString(), Exchange.FILE_NAME, "input.txt");
+
+ // start route
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(size);
+ mock.expectsNoDuplicates(body());
+ mock.setResultWaitTime(30000);
+
+ context.startRoute("foo");
+
+ assertMockEndpointsSatisfied();
+
+ // check the file has 10000 lines
+ String txt = context.getTypeConverter().convertTo(String.class, new File("target/concurrent/outbox/result.txt"));
+ assertNotNull(txt);
+
+ String[] lines = txt.split("\n");
+ assertEquals("Should be " + size + " lines", size, lines.length);
+
+ // should be 10000 unique
+ Set<String> rows = new LinkedHashSet<String>(Arrays.asList(lines));
+ assertEquals("Should be " + size + " unique lines", size, rows.size());
+
+ log.info(txt);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("file:target/concurrent").routeId("foo").noAutoStartup()
+ .split(body().tokenize("\n")).parallelProcessing().streaming()
+ .setBody(body().append(":Status=OK\n"))
+ .to("file:target/concurrent/outbox?fileExist=Append&fileName=result.txt")
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+}