You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/02/08 16:35:11 UTC

svn commit: r1068443 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/ main/java/org/apache/camel/util/concurrent/ test/java/org/apache/camel/component/file/

Author: davsclaus
Date: Tue Feb  8 15:35:11 2011
New Revision: 1068443

URL: http://svn.apache.org/viewvc?rev=1068443&view=rev
Log:
CAMEL-3584: Fixed concurrent writing the same file using file producer should be synchronized.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java?rev=1068443&r1=1068442&r2=1068443&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java Tue Feb  8 15:35:11 2011
@@ -17,17 +17,20 @@
 package org.apache.camel.component.file;
 
 import java.io.File;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
-import org.apache.camel.ExpressionIllegalSyntaxException;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.language.simple.SimpleLanguage;
 import org.apache.camel.spi.Language;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.LRUCache;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +41,9 @@ public class GenericFileProducer<T> exte
     protected final transient Logger log = LoggerFactory.getLogger(getClass());
     protected final GenericFileEndpoint<T> endpoint;
     protected GenericFileOperations<T> operations;
-    
+    // assume writing to 100 different files concurrently at most for the same file producer
+    private final LRUCache<String, Lock> locks = new LRUCache<String, Lock>(100);
+
     protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) {
         super(endpoint);
         this.endpoint = endpoint;
@@ -56,8 +61,29 @@ public class GenericFileProducer<T> exte
     public void process(Exchange exchange) throws Exception {
         Exchange fileExchange = endpoint.createExchange(exchange);
         endpoint.configureExchange(fileExchange);
-        processExchange(fileExchange);
-        ExchangeHelper.copyResults(exchange, fileExchange);
+
+        String target = createFileName(exchange);
+
+        // use lock for same file name to avoid concurrent writes to the same file
+        // for example when you concurrently append to the same file
+        Lock lock;
+        synchronized (locks) {
+            lock = locks.get(target);
+            if (lock == null) {
+                lock = new ReentrantLock();
+                locks.put(target, lock);
+            }
+        }
+
+        lock.lock();
+        try {
+            processExchange(fileExchange, target);
+            ExchangeHelper.copyResults(exchange, fileExchange);
+        } finally {
+            // do not remove as the locks cache has an upper bound
+            // this ensure the locks is appropriate reused
+            lock.unlock();
+        }
     }
 
     /**
@@ -75,16 +101,15 @@ public class GenericFileProducer<T> exte
      * Perform the work to process the fileExchange
      *
      * @param exchange fileExchange
+     * @param target   the target filename
      * @throws Exception is thrown if some error
      */
-    protected void processExchange(Exchange exchange) throws Exception {
+    protected void processExchange(Exchange exchange, String target) throws Exception {
         if (log.isTraceEnabled()) {
-            log.trace("Processing " + exchange);
+            log.trace("Processing file: " + target + " for exchange: " + exchange);
         }
 
         try {
-            String target = createFileName(exchange);
-
             preWriteCheck();
 
             // should we write to a temporary name and then afterwards rename to real target
@@ -335,4 +360,15 @@ public class GenericFileProducer<T> exte
         }
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        ServiceHelper.startService(locks);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(locks);
+        super.doStop();
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1068443&r1=1068442&r2=1068443&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Tue Feb  8 15:35:11 2011
@@ -360,6 +360,10 @@ public final class ExecutorServiceHelper
             }
             return answer;
         }
+
+        public String toString() {
+            return "CamelThreadFactory[" + name + "]";
+        }
     }
 
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java?rev=1068443&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java Tue Feb  8 15:35:11 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class FileConcurrentWriteAppendSameFileTest extends ContextTestSupport {
+
+    private final int size = 5000;
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/concurrent");
+        super.setUp();
+    }
+
+    public void testConcurrentAppend() throws Exception {
+        // create file with many lines
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+            sb.append("Line " + i + "\n");
+        }
+
+        template.sendBodyAndHeader("file:target/concurrent", sb.toString(), Exchange.FILE_NAME, "input.txt");
+
+        // start route
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(size);
+        mock.expectsNoDuplicates(body());
+        mock.setResultWaitTime(30000);
+
+        context.startRoute("foo");
+
+        assertMockEndpointsSatisfied();
+
+        // check the file has 10000 lines
+        String txt = context.getTypeConverter().convertTo(String.class, new File("target/concurrent/outbox/result.txt"));
+        assertNotNull(txt);
+
+        String[] lines = txt.split("\n");
+        assertEquals("Should be " + size + " lines", size, lines.length);
+
+        // should be 10000 unique
+        Set<String> rows = new LinkedHashSet<String>(Arrays.asList(lines));
+        assertEquals("Should be " + size + " unique lines", size, rows.size());
+
+        log.info(txt);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/concurrent").routeId("foo").noAutoStartup()
+                    .split(body().tokenize("\n")).parallelProcessing().streaming()
+                        .setBody(body().append(":Status=OK\n"))
+                        .to("file:target/concurrent/outbox?fileExist=Append&fileName=result.txt")
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+}