You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/14 10:22:01 UTC

svn commit: r933877 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/test/java/org/apache/camel/component/file/ camel-core/src/test/java/org/apache/camel/component/file/stress/ components/camel-ftp/src/test/ja...

Author: davsclaus
Date: Wed Apr 14 08:22:01 2010
New Revision: 933877

URL: http://svn.apache.org/viewvc?rev=933877&view=rev
Log:
CAMEL-2640: Fixed file consumer with recursive and noop enable would not pick files with similar name in sibling folders.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java   (with props)
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=933877&r1=933876&r2=933877&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Wed Apr 14 08:22:01 2010
@@ -62,8 +62,8 @@ public abstract class GenericFileConsume
         shutdownRunningTask = null;
         pendingExchanges = 0;
 
-        // before we poll is there anything we need to check ? Such as are we
-        // connected to the FTP Server Still ?
+        // before we poll is there anything we need to check?
+        // such as are we connected to the FTP Server still?
         if (!prePollCheck()) {
             if (log.isDebugEnabled()) {
                 log.debug("Skipping pool as pre poll check returned false");
@@ -141,7 +141,7 @@ public abstract class GenericFileConsume
         while (exchanges.size() > 0) {
             Exchange exchange = (Exchange) exchanges.poll();
             GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
-            String key = file.getFileName();
+            String key = file.getAbsoluteFilePath();
             endpoint.getInProgressRepository().remove(key);
         }
     }
@@ -215,6 +215,9 @@ public abstract class GenericFileConsume
             log.trace("Processing file: " + file);
         }
 
+        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
+        // and then the file name would be changed
+        String absoluteFileName = file.getAbsoluteFilePath();
         try {
             final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
 
@@ -224,7 +227,7 @@ public abstract class GenericFileConsume
                     log.debug(endpoint + " cannot begin processing file: " + file);
                 }
                 // remove file from the in progress list as its no longer in progress
-                endpoint.getInProgressRepository().remove(file.getFileName());
+                endpoint.getInProgressRepository().remove(absoluteFileName);
                 return;
             }
 
@@ -251,8 +254,7 @@ public abstract class GenericFileConsume
 
             // register on completion callback that does the completion strategies
             // (for instance to move the file after we have processed it)
-            String originalFileName = file.getFileName();
-            exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, originalFileName));
+            exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
 
             // process the exchange
             getProcessor().process(exchange);
@@ -263,10 +265,9 @@ public abstract class GenericFileConsume
     }
 
     /**
-     * Strategy for validating if the given remote file should be included or
-     * not
+     * Strategy for validating if the given remote file should be included or not
      *
-     * @param file        the remote file
+     * @param file        the file
      * @param isDirectory whether the file is a directory or a file
      * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
      */
@@ -277,8 +278,7 @@ public abstract class GenericFileConsume
                 log.trace("File did not match. Will skip this file: " + file);
             }
             return false;
-        } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) {
-            // only use the filename as the key as the file could be moved into a done folder
+        } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
             if (log.isTraceEnabled()) {
                 log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file);
             }
@@ -359,7 +359,7 @@ public abstract class GenericFileConsume
      * @return <tt>true</tt> if the file is already in progress
      */
     protected boolean isInProgress(GenericFile<T> file) {
-        String key = file.getFileName();
+        String key = file.getAbsoluteFilePath();
         return !endpoint.getInProgressRepository().add(key);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java?rev=933877&r1=933876&r2=933877&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java Wed Apr 14 08:22:01 2010
@@ -38,14 +38,14 @@ public class GenericFileOnCompletion<T> 
     private GenericFileOperations<T> operations;
     private ExceptionHandler exceptionHandler;
     private GenericFile<T> file;
-    private String originalFileName;
+    private String absoluteFileName;
 
     public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations,
-                                   GenericFile<T> file, String originalFileName) {
+                                   GenericFile<T> file, String absoluteFileName) {
         this.endpoint = endpoint;
         this.operations = operations;
         this.file = file;
-        this.originalFileName = originalFileName;
+        this.absoluteFileName = absoluteFileName;
     }
 
     public void onComplete(Exchange exchange) {
@@ -96,7 +96,7 @@ public class GenericFileOnCompletion<T> 
             // remove file from the in progress list as its no longer in progress
             // use the original file name that was used to add it to the repository
             // as the name can be different when using preMove option
-            endpoint.getInProgressRepository().remove(originalFileName);
+            endpoint.getInProgressRepository().remove(absoluteFileName);
         }
     }
 
@@ -111,8 +111,7 @@ public class GenericFileOnCompletion<T> 
                                          Exchange exchange, GenericFile<T> file) {
         if (endpoint.isIdempotent()) {
             // only add to idempotent repository if we could process the file
-            // only use the filename as the key as the file could be moved into a done folder
-            endpoint.getIdempotentRepository().add(file.getFileName());
+            endpoint.getIdempotentRepository().add(absoluteFileName);
         }
 
         try {

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java?rev=933877&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java Wed Apr 14 08:22:01 2010
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class FileRecursiveNoopTest extends ContextTestSupport {
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/noop");
+        super.setUp();
+    }
+
+    public void testRecursiveNoop() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceivedInAnyOrder("a", "b", "a2", "c", "b2");
+
+        template.sendBodyAndHeader("file:target/noop", "a", Exchange.FILE_NAME, "a.txt");
+        template.sendBodyAndHeader("file:target/noop", "b", Exchange.FILE_NAME, "b.txt");
+        template.sendBodyAndHeader("file:target/noop/foo", "a2", Exchange.FILE_NAME, "a.txt");
+        template.sendBodyAndHeader("file:target/noop/bar", "c", Exchange.FILE_NAME, "c.txt");
+        template.sendBodyAndHeader("file:target/noop/bar", "b2", Exchange.FILE_NAME, "b.txt");
+
+        assertMockEndpointsSatisfied();
+
+        // reset mock and send in a new file to be picked up only
+        mock.reset();
+        mock.expectedBodiesReceived("c2");
+
+        template.sendBodyAndHeader("file:target/noop", "c2", Exchange.FILE_NAME, "c.txt");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/noop?recursive=true&noop=true")
+                    .convertBodyTo(String.class)
+                    .to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java?rev=933877&r1=933876&r2=933877&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java Wed Apr 14 08:22:01 2010
@@ -43,6 +43,7 @@ public class FileAsyncStressTest extends
     public void testAsyncStress() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMinimumMessageCount(100);
+        mock.setResultWaitTime(20000);
 
         assertMockEndpointsSatisfied();
     }
@@ -61,7 +62,7 @@ public class FileAsyncStressTest extends
                         public void process(Exchange exchange) throws Exception {
                             // simulate some work with random time to complete
                             Random ran = new Random();
-                            int delay = ran.nextInt(250) + 10;
+                            int delay = ran.nextInt(50) + 10;
                             Thread.sleep(delay);
                         }
                     }).to("mock:result");

Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java?rev=933877&view=auto
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java (added)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java Wed Apr 14 08:22:01 2010
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class FromFtpRecursiveNoopTest extends FtpServerTestSupport {
+
+    private String getFtpUrl() {
+        return "ftp://admin@localhost:" + getPort() + "/noop?password=admin&binary=false"
+                + "&recursive=true&noop=true";
+    }
+
+    @Test
+    public void testRecursiveNoop() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceivedInAnyOrder("a", "b", "a2", "c", "b2");
+
+        template.sendBodyAndHeader(getFtpUrl(), "a", Exchange.FILE_NAME, "a.txt");
+        template.sendBodyAndHeader(getFtpUrl(), "b", Exchange.FILE_NAME, "b.txt");
+        template.sendBodyAndHeader(getFtpUrl(), "a2", Exchange.FILE_NAME, "foo/a.txt");
+        template.sendBodyAndHeader(getFtpUrl(), "c", Exchange.FILE_NAME, "bar/c.txt");
+        template.sendBodyAndHeader(getFtpUrl(), "b2", Exchange.FILE_NAME, "bar/b.txt");
+
+        assertMockEndpointsSatisfied();
+
+        // reset mock and send in a new file to be picked up only
+        mock.reset();
+        mock.expectedBodiesReceived("c2");
+
+        template.sendBodyAndHeader(getFtpUrl(), "c2", Exchange.FILE_NAME, "c.txt");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(getFtpUrl())
+                    .convertBodyTo(String.class)
+                    .to("mock:result");
+            }
+        };
+    }
+
+
+}

Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java?rev=933877&r1=933876&r2=933877&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java Wed Apr 14 08:22:01 2010
@@ -23,6 +23,8 @@ import org.apache.camel.ContextTestSuppo
 import org.apache.camel.Exchange;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.util.FileUtil;
+
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
 public class FileConsumerIdempotentTest extends ContextTestSupport {
@@ -43,8 +45,6 @@ public class FileConsumerIdempotentTest 
 
 
     public void testIdempotent() throws Exception {
-        assertFalse(repo.contains("report.txt"));
-
         // send a file
         template.sendBodyAndHeader("file://target/fileidempotent/", "Hello World", Exchange.FILE_NAME, "report.txt");
 
@@ -69,7 +69,9 @@ public class FileConsumerIdempotentTest 
 
         // should NOT consume the file again, let 2 secs pass to let the consumer try to consume it but it should not
         assertMockEndpointsSatisfied();
-        assertTrue(repo.contains("report.txt"));
+
+        String name = FileUtil.normalizePath(new File("target/fileidempotent/report.txt").getAbsolutePath());
+        assertTrue("Should contain file: " + name, repo.contains(name));
     }
 
 }