You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/14 10:22:01 UTC
svn commit: r933877 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/test/java/org/apache/camel/component/file/
camel-core/src/test/java/org/apache/camel/component/file/stress/
components/camel-ftp/src/test/ja...
Author: davsclaus
Date: Wed Apr 14 08:22:01 2010
New Revision: 933877
URL: http://svn.apache.org/viewvc?rev=933877&view=rev
Log:
CAMEL-2640: Fixed file consumer with recursive and noop enable would not pick files with similar name in sibling folders.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java (with props)
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=933877&r1=933876&r2=933877&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Wed Apr 14 08:22:01 2010
@@ -62,8 +62,8 @@ public abstract class GenericFileConsume
shutdownRunningTask = null;
pendingExchanges = 0;
- // before we poll is there anything we need to check ? Such as are we
- // connected to the FTP Server Still ?
+ // before we poll is there anything we need to check?
+ // such as are we connected to the FTP Server still?
if (!prePollCheck()) {
if (log.isDebugEnabled()) {
log.debug("Skipping pool as pre poll check returned false");
@@ -141,7 +141,7 @@ public abstract class GenericFileConsume
while (exchanges.size() > 0) {
Exchange exchange = (Exchange) exchanges.poll();
GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
- String key = file.getFileName();
+ String key = file.getAbsoluteFilePath();
endpoint.getInProgressRepository().remove(key);
}
}
@@ -215,6 +215,9 @@ public abstract class GenericFileConsume
log.trace("Processing file: " + file);
}
+ // must extract the absolute name before the begin strategy as the file could potentially be pre moved
+ // and then the file name would be changed
+ String absoluteFileName = file.getAbsoluteFilePath();
try {
final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
@@ -224,7 +227,7 @@ public abstract class GenericFileConsume
log.debug(endpoint + " cannot begin processing file: " + file);
}
// remove file from the in progress list as its no longer in progress
- endpoint.getInProgressRepository().remove(file.getFileName());
+ endpoint.getInProgressRepository().remove(absoluteFileName);
return;
}
@@ -251,8 +254,7 @@ public abstract class GenericFileConsume
// register on completion callback that does the completion strategies
// (for instance to move the file after we have processed it)
- String originalFileName = file.getFileName();
- exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, originalFileName));
+ exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
// process the exchange
getProcessor().process(exchange);
@@ -263,10 +265,9 @@ public abstract class GenericFileConsume
}
/**
- * Strategy for validating if the given remote file should be included or
- * not
+ * Strategy for validating if the given remote file should be included or not
*
- * @param file the remote file
+ * @param file the file
* @param isDirectory whether the file is a directory or a file
* @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
*/
@@ -277,8 +278,7 @@ public abstract class GenericFileConsume
log.trace("File did not match. Will skip this file: " + file);
}
return false;
- } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) {
- // only use the filename as the key as the file could be moved into a done folder
+ } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
if (log.isTraceEnabled()) {
log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file);
}
@@ -359,7 +359,7 @@ public abstract class GenericFileConsume
* @return <tt>true</tt> if the file is already in progress
*/
protected boolean isInProgress(GenericFile<T> file) {
- String key = file.getFileName();
+ String key = file.getAbsoluteFilePath();
return !endpoint.getInProgressRepository().add(key);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java?rev=933877&r1=933876&r2=933877&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java Wed Apr 14 08:22:01 2010
@@ -38,14 +38,14 @@ public class GenericFileOnCompletion<T>
private GenericFileOperations<T> operations;
private ExceptionHandler exceptionHandler;
private GenericFile<T> file;
- private String originalFileName;
+ private String absoluteFileName;
public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations,
- GenericFile<T> file, String originalFileName) {
+ GenericFile<T> file, String absoluteFileName) {
this.endpoint = endpoint;
this.operations = operations;
this.file = file;
- this.originalFileName = originalFileName;
+ this.absoluteFileName = absoluteFileName;
}
public void onComplete(Exchange exchange) {
@@ -96,7 +96,7 @@ public class GenericFileOnCompletion<T>
// remove file from the in progress list as its no longer in progress
// use the original file name that was used to add it to the repository
// as the name can be different when using preMove option
- endpoint.getInProgressRepository().remove(originalFileName);
+ endpoint.getInProgressRepository().remove(absoluteFileName);
}
}
@@ -111,8 +111,7 @@ public class GenericFileOnCompletion<T>
Exchange exchange, GenericFile<T> file) {
if (endpoint.isIdempotent()) {
// only add to idempotent repository if we could process the file
- // only use the filename as the key as the file could be moved into a done folder
- endpoint.getIdempotentRepository().add(file.getFileName());
+ endpoint.getIdempotentRepository().add(absoluteFileName);
}
try {
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java?rev=933877&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java Wed Apr 14 08:22:01 2010
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class FileRecursiveNoopTest extends ContextTestSupport {
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/noop");
+ super.setUp();
+ }
+
+ public void testRecursiveNoop() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceivedInAnyOrder("a", "b", "a2", "c", "b2");
+
+ template.sendBodyAndHeader("file:target/noop", "a", Exchange.FILE_NAME, "a.txt");
+ template.sendBodyAndHeader("file:target/noop", "b", Exchange.FILE_NAME, "b.txt");
+ template.sendBodyAndHeader("file:target/noop/foo", "a2", Exchange.FILE_NAME, "a.txt");
+ template.sendBodyAndHeader("file:target/noop/bar", "c", Exchange.FILE_NAME, "c.txt");
+ template.sendBodyAndHeader("file:target/noop/bar", "b2", Exchange.FILE_NAME, "b.txt");
+
+ assertMockEndpointsSatisfied();
+
+ // reset mock and send in a new file to be picked up only
+ mock.reset();
+ mock.expectedBodiesReceived("c2");
+
+ template.sendBodyAndHeader("file:target/noop", "c2", Exchange.FILE_NAME, "c.txt");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("file:target/noop?recursive=true&noop=true")
+ .convertBodyTo(String.class)
+ .to("mock:result");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRecursiveNoopTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java?rev=933877&r1=933876&r2=933877&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java Wed Apr 14 08:22:01 2010
@@ -43,6 +43,7 @@ public class FileAsyncStressTest extends
public void testAsyncStress() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMinimumMessageCount(100);
+ mock.setResultWaitTime(20000);
assertMockEndpointsSatisfied();
}
@@ -61,7 +62,7 @@ public class FileAsyncStressTest extends
public void process(Exchange exchange) throws Exception {
// simulate some work with random time to complete
Random ran = new Random();
- int delay = ran.nextInt(250) + 10;
+ int delay = ran.nextInt(50) + 10;
Thread.sleep(delay);
}
}).to("mock:result");
Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java?rev=933877&view=auto
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java (added)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java Wed Apr 14 08:22:01 2010
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class FromFtpRecursiveNoopTest extends FtpServerTestSupport {
+
+ private String getFtpUrl() {
+ return "ftp://admin@localhost:" + getPort() + "/noop?password=admin&binary=false"
+ + "&recursive=true&noop=true";
+ }
+
+ @Test
+ public void testRecursiveNoop() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceivedInAnyOrder("a", "b", "a2", "c", "b2");
+
+ template.sendBodyAndHeader(getFtpUrl(), "a", Exchange.FILE_NAME, "a.txt");
+ template.sendBodyAndHeader(getFtpUrl(), "b", Exchange.FILE_NAME, "b.txt");
+ template.sendBodyAndHeader(getFtpUrl(), "a2", Exchange.FILE_NAME, "foo/a.txt");
+ template.sendBodyAndHeader(getFtpUrl(), "c", Exchange.FILE_NAME, "bar/c.txt");
+ template.sendBodyAndHeader(getFtpUrl(), "b2", Exchange.FILE_NAME, "bar/b.txt");
+
+ assertMockEndpointsSatisfied();
+
+ // reset mock and send in a new file to be picked up only
+ mock.reset();
+ mock.expectedBodiesReceived("c2");
+
+ template.sendBodyAndHeader(getFtpUrl(), "c2", Exchange.FILE_NAME, "c.txt");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(getFtpUrl())
+ .convertBodyTo(String.class)
+ .to("mock:result");
+ }
+ };
+ }
+
+
+}
Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpRecursiveNoopTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java?rev=933877&r1=933876&r2=933877&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java Wed Apr 14 08:22:01 2010
@@ -23,6 +23,8 @@ import org.apache.camel.ContextTestSuppo
import org.apache.camel.Exchange;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.util.FileUtil;
+
import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
public class FileConsumerIdempotentTest extends ContextTestSupport {
@@ -43,8 +45,6 @@ public class FileConsumerIdempotentTest
public void testIdempotent() throws Exception {
- assertFalse(repo.contains("report.txt"));
-
// send a file
template.sendBodyAndHeader("file://target/fileidempotent/", "Hello World", Exchange.FILE_NAME, "report.txt");
@@ -69,7 +69,9 @@ public class FileConsumerIdempotentTest
// should NOT consume the file again, let 2 secs pass to let the consumer try to consume it but it should not
assertMockEndpointsSatisfied();
- assertTrue(repo.contains("report.txt"));
+
+ String name = FileUtil.normalizePath(new File("target/fileidempotent/report.txt").getAbsolutePath());
+ assertTrue("Should contain file: " + name, repo.contains(name));
}
}