You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/08 10:48:40 UTC

svn commit: r782560 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/component/file/strategy/ camel-core/src/test/java/org/apache/camel/component/file/stress/ components/camel-ftp/sr...

Author: davsclaus
Date: Mon Jun  8 08:48:39 2009
New Revision: 782560

URL: http://svn.apache.org/viewvc?rev=782560&view=rev
Log:
CAMEL-1670: fixed problem with ftp consumer not detecting in progress files.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java   (contents, props changed)
      - copied, changed from r782538, camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java   (contents, props changed)
      - copied, changed from r782538, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java
Removed:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java Mon Jun  8 08:48:39 2009
@@ -26,12 +26,9 @@
  * <p/>
  * Camel supports out of the box the following strategies:
  * <ul>
- * <li>GenericFileRenameExclusiveReadLockStrategy waiting until its possible to
- * rename the file.</li>
- * <li>NewFileLockExclusiveReadLockStrategy acquiring a RW file lock for the duration
- * of the processing.</li>
- * <li>NewMarkerFileExclusiveReadLockStrategy using a marker file for acquiring
- * read lock.</li>
+ *   <li>FileRenameExclusiveReadLockStrategy waiting until its possible to rename the file.</li>
+ *   <li>FileLockExclusiveReadLockStrategy acquiring a RW file lock for the duration of the processing.</li>
+ *   <li>MarkerFileExclusiveReadLockStrategy using a marker file for acquiring read lock.</li>
  * </ul>
  */
 public interface GenericFileExclusiveReadLockStrategy<T> {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java Mon Jun  8 08:48:39 2009
@@ -111,8 +111,8 @@
         return true;
     }
 
-    public void releaseExclusiveReadLock(GenericFileOperations<File> fileGenericFileOperations,
-                                         GenericFile<File> fileGenericFile, Exchange exchange) throws Exception {
+    public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
+                                         GenericFile<File> file, Exchange exchange) throws Exception {
         FileLock lock = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLock", FileLock.class);
         String lockFileName = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLockName", String.class);
         Channel channel = lock.channel();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java Mon Jun  8 08:48:39 2009
@@ -48,7 +48,7 @@
             strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             return strategy;
         } else if (moveExpression != null || preMoveExpression != null) {
-            GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<File>();
+            FileRenameProcessStrategy<File> strategy = new FileRenameProcessStrategy<File>();
             strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             if (moveExpression != null) {
                 GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<File>();
@@ -63,7 +63,7 @@
             return strategy;
         } else {
             // default strategy will move files in a .camel/ subfolder where the file was consumed
-            GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<File>();
+            FileRenameProcessStrategy<File> strategy = new FileRenameProcessStrategy<File>();
             strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             // use context to lookup language to let it be loose coupled
             Language language = context.resolveLanguage("file");

Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java (from r782538, camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java&r1=782538&r2=782560&rev=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java Mon Jun  8 08:48:39 2009
@@ -24,11 +24,11 @@
 import org.apache.camel.component.file.GenericFileOperationFailedException;
 import org.apache.camel.component.file.GenericFileOperations;
 
-public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
+public class FileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
     private GenericFileRenamer<T> beginRenamer;
     private GenericFileRenamer<T> commitRenamer;
 
-    public GenericFileRenameProcessStrategy() {
+    public FileRenameProcessStrategy() {
     }
 
     @Override

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java Mon Jun  8 08:48:39 2009
@@ -47,7 +47,7 @@
             strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             return strategy;
         } else if (moveExpression != null || preMoveExpression != null) {
-            GenericFileRenameProcessStrategy strategy = new GenericFileRenameProcessStrategy();
+            FileRenameProcessStrategy strategy = new FileRenameProcessStrategy();
             strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             if (moveExpression != null) {
                 GenericFileExpressionRenamer renamer = new GenericFileExpressionRenamer();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java Mon Jun  8 08:48:39 2009
@@ -73,10 +73,10 @@
         // delete local work file, if it was used (eg by ftp component)
         File local = exchange.getIn().getHeader(Exchange.FILE_LOCAL_WORK_PATH, File.class);
         if (local != null && local.exists()) {
+            boolean deleted = local.delete();
             if (log.isTraceEnabled()) {
-                log.trace("Deleting lock work file: " + local);
+                log.trace("Local work file: " + local + " was deleted: " + deleted);
             }
-            local.delete();
         }
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java Mon Jun  8 08:48:39 2009
@@ -79,7 +79,7 @@
         return true;
     }
 
-    public void releaseExclusiveReadLock(GenericFileOperations<T> opeations, GenericFile<T> file,
+    public void releaseExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file,
                                          Exchange exchange) throws Exception {
         // noop
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java Mon Jun  8 08:48:39 2009
@@ -59,8 +59,8 @@
         }
     }
 
-    public void releaseExclusiveReadLock(GenericFileOperations<File> fileGenericFileOperations,
-                                         GenericFile<File> fileGenericFile, Exchange exchange) throws Exception {
+    public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
+                                         GenericFile<File> file, Exchange exchange) throws Exception {
         FileLock lock = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLock", FileLock.class);
         String lockFileName = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLockName", String.class);
         Channel channel = lock.channel();

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java Mon Jun  8 08:48:39 2009
@@ -52,7 +52,7 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // leverage the fact that we can limit to max 25 files per poll
+                // leverage the fact that we can limit to max 50 files per poll
                 // this will result in polling again and potentially picking up files
                 // that already are in progress
                 from("file:target/filestress?maxMessagesPerPoll=50")

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Mon Jun  8 08:48:39 2009
@@ -59,8 +59,14 @@
             } else if (file.isFile()) {
                 RemoteFile<FTPFile> remote = asRemoteFile(fileName, file);
                 if (isValidFile(remote, false)) {
-                    // matched file so add
-                    fileList.add(remote);
+                    if (isInProgress(remote)) {
+                        if (log.isTraceEnabled()) {
+                            log.trace("Skipping as file is already in progress: " + remote.getFileName());
+                        }
+                    } else {
+                        // matched file so add
+                        fileList.add(remote);
+                    }
                 }
             } else {
                 log.debug("Ignoring unsupported remote file type: " + file);

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Mon Jun  8 08:48:39 2009
@@ -61,8 +61,14 @@
             } else {
                 RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(fileName, file);
                 if (isValidFile(remote, false)) {
-                    // matched file so add
-                    fileList.add(remote);
+                    if (isInProgress(remote)) {
+                        if (log.isTraceEnabled()) {
+                            log.trace("Skipping as file is already in progress: " + remote.getFileName());
+                        }
+                    } else {
+                        // matched file so add
+                        fileList.add(remote);
+                    }
                 }
             }
         }

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java (from r782538, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java&r1=782538&r2=782560&rev=782560&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java Mon Jun  8 08:48:39 2009
@@ -16,59 +16,58 @@
  */
 package org.apache.camel.component.file.remote;
 
-import org.apache.camel.Endpoint;
+import java.util.Random;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version $Revision$
  */
-public class FtpConsumerBodyAsStringTest extends FtpServerTestSupport {
+public class FtpConsumerAsyncStressTest extends FtpServerTestSupport {
+
+    private int files = 100;
 
     private String getFtpUrl() {
-        return "ftp://admin@localhost:" + getPort() + "/tmp4/camel?password=admin&consumer.delay=5000";
+        return "ftp://admin@localhost:" + getPort() + "/filestress/?password=admin&maxMessagesPerPoll=25";
     }
 
     @Override
     protected void setUp() throws Exception {
+        deleteDirectory(FTP_ROOT_DIR + "filestress");
         super.setUp();
-        prepareFtpServer();
+        for (int i = 0; i < files; i++) {
+            template.sendBodyAndHeader("file://" + FTP_ROOT_DIR + "filestress", "Hello World", Exchange.FILE_NAME, i + ".txt");
+        }
     }
 
-    private void prepareFtpServer() throws Exception {
-        // prepares the FTP Server by creating a file on the server that we want to unit
-        // test that we can pool
-        Endpoint endpoint = context.getEndpoint(getFtpUrl());
-        Exchange exchange = endpoint.createExchange();
-        exchange.getIn().setBody("Hello World");
-        exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
-        Producer producer = endpoint.createProducer();
-        producer.start();
-        producer.process(exchange);
-        producer.stop();
-    }
-
-    public void testSingleFileTest() throws Exception {
+    public void testFTPConsumerAsyncStress() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World");
-        mock.expectedMessageCount(1);
+        mock.expectedMinimumMessageCount(50);
 
         assertMockEndpointsSatisfied();
     }
 
+    @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
+            @Override
             public void configure() throws Exception {
-                from(getFtpUrl()).process(new Processor() {
-                    public void process(Exchange exchange) throws Exception {
-                        String body = exchange.getIn().getBody(String.class);
-                        assertNotNull(body);
-                        assertEquals("Hello World", body);
-                    }
-                }).to("mock:result");
+                // leverage the fact that we can limit to max 25 files per poll
+                // this will result in polling again and potentially picking up files
+                // that already are in progress
+                from(getFtpUrl())
+                    .threads(10)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            // simulate some work with random time to complete
+                            Random ran = new Random();
+                            int delay = ran.nextInt(500) + 10;
+                            Thread.sleep(delay);
+                        }
+                    }).to("mock:result");
             }
         };
     }

Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java Mon Jun  8 08:48:39 2009
@@ -93,6 +93,10 @@
         public boolean remove(String key) {
             return true;
         }
+
+        public boolean confirm(String key) {
+            return true;
+        }
     }
 
 }
\ No newline at end of file