You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/09/05 13:00:58 UTC
[1/2] git commit: CAMEL-6671: FTP consumer. Added useList and
ignoreFileNotFound options to allow to download a single file without using
FTP LIST command which is needed in some use-cases,
such as user has no permission to do FTP LIST.
Updated Branches:
refs/heads/camel-2.12.x b56b1c56d -> c948c2b8d
refs/heads/master f4efabae3 -> d99d5940d
CAMEL-6671: FTP consumer. Added useList and ignoreFileNotFound options to allow to download a single file without using FTP LIST command which is needed in some use-cases, such as user has no permission to do FTP LIST.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d99d5940
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d99d5940
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d99d5940
Branch: refs/heads/master
Commit: d99d5940dd1a3d0a48ebd3382b57d2cfdb422008
Parents: f4efaba
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Sep 5 12:44:16 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Sep 5 12:49:26 2013 +0200
----------------------------------------------------------------------
.../component/file/GenericFileConsumer.java | 64 +++++++++++++----
.../camel/impl/ScheduledPollConsumer.java | 1 +
.../component/file/remote/FtpConsumer.java | 40 +++++++++--
.../file/remote/RemoteFileConfiguration.java | 35 ++++++++-
.../file/remote/RemoteFileConsumer.java | 4 +-
.../component/file/remote/SftpConsumer.java | 1 +
.../file/remote/FromFtpUseListFalseTest.java | 75 ++++++++++++++++++++
.../FtpConsumerTemplateUseListFalseTest.java | 74 +++++++++++++++++++
8 files changed, 271 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 0f8be8e..c55755e 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -171,6 +171,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
public int processBatch(Queue<Object> exchanges) {
int total = exchanges.size();
+ int answer = total;
// limit if needed
if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
@@ -191,19 +192,25 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
pendingExchanges = total - index - 1;
// process the current exchange
+ boolean started;
if (customProcessor != null) {
// use a custom processor
- customProcessExchange(exchange, customProcessor);
+ started = customProcessExchange(exchange, customProcessor);
} else {
// process the exchange regular
- processExchange(exchange);
+ started = processExchange(exchange);
+ }
+
+ // if we did not start process the file then decremember the counter
+ if (!started) {
+ answer--;
}
}
// drain any in progress files as we are done with this batch
removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0);
- return total;
+ return answer;
}
protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
@@ -277,11 +284,28 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
}
/**
+ * Whether to ignore if the file cannot be retrieved.
+ * <p/>
+ * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved.
+ * <p/>
+ * This method allows to suppress this and just ignore that.
+ *
+ * @param name the file name
+ * @param exchange the exchange
+ * @return <tt>true</tt> to ignore, <tt>false</tt> is the default.
+ */
+ protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange) {
+ return false;
+ }
+
+ /**
* Processes the exchange
*
* @param exchange the exchange
+ * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started
+ * to be processed, for some reason (not found, or aborted etc)
*/
- protected void processExchange(final Exchange exchange) {
+ protected boolean processExchange(final Exchange exchange) {
GenericFile<T> file = getExchangeFileProperty(exchange);
log.trace("Processing file: {}", file);
@@ -303,7 +327,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
// begin returned false, so remove file from the in progress list as its no longer in progress
endpoint.getInProgressRepository().remove(absoluteFileName);
}
- return;
+ return false;
}
} catch (Exception e) {
// remove file from the in progress list due to failure
@@ -311,7 +335,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
String msg = endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage();
handleException(msg, e);
- return;
+ return false;
}
// must use file from exchange as it can be updated due the
@@ -328,10 +352,17 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
// retrieve the file and check it was a success
boolean retrieved = operations.retrieveFile(name, exchange);
if (!retrieved) {
- // throw exception to handle the problem with retrieving the file
- // then if the method return false or throws an exception is handled the same in here
- // as in both cases an exception is being thrown
- throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
+ if (ignoreCannotRetrieveFile(name, exchange)) {
+ log.trace("Cannot retrieve file {} maybe it does not exists. Ignorning.", name);
+ // remove file from the in progress list as we could not retrieve it, but should ignore
+ endpoint.getInProgressRepository().remove(absoluteFileName);
+ return false;
+ } else {
+ // throw exception to handle the problem with retrieving the file
+ // then if the method return false or throws an exception is handled the same in here
+ // as in both cases an exception is being thrown
+ throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
+ }
}
log.trace("Retrieved file: {} from: {}", name, endpoint);
@@ -368,6 +399,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
String msg = "Error processing file " + file + " due to " + e.getMessage();
handleException(msg, e);
}
+
+ return true;
}
/**
@@ -385,7 +418,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
* @param exchange the exchange
* @param processor the custom processor
*/
- protected void customProcessExchange(final Exchange exchange, final Processor processor) {
+ protected boolean customProcessExchange(final Exchange exchange, final Processor processor) {
GenericFile<T> file = getExchangeFileProperty(exchange);
log.trace("Custom processing file: {}", file);
@@ -407,6 +440,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
// as the name can be different when using preMove option
endpoint.getInProgressRepository().remove(absoluteFileName);
}
+
+ return true;
}
/**
@@ -508,7 +543,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
// use file expression for a simple dynamic file filter
if (endpoint.getFileName() != null) {
- evaluateFileExpression();
+ fileExpressionResult = evaluateFileExpression();
if (fileExpressionResult != null) {
if (!name.equals(fileExpressionResult)) {
return false;
@@ -557,12 +592,13 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
return !endpoint.getInProgressRepository().add(key);
}
- private void evaluateFileExpression() {
- if (fileExpressionResult == null) {
+ protected String evaluateFileExpression() {
+ if (fileExpressionResult == null && endpoint.getFileName() != null) {
// create a dummy exchange as Exchange is needed for expression evaluation
Exchange dummy = endpoint.createExchange();
fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
}
+ return fileExpressionResult;
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
index 6b34ac7..98b9e42 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
@@ -185,6 +185,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
if (begin) {
retryCounter++;
polledMessages = poll();
+ LOG.trace("Polled {} messages", polledMessages);
if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
// send an "empty" exchange
http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
index 7b042ea..5d7dca5 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
@@ -16,8 +16,10 @@
*/
package org.apache.camel.component.file.remote;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.util.FileUtil;
@@ -83,11 +85,24 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
}
log.trace("Polling directory: {}", dir);
- List<FTPFile> files;
- if (isStepwise()) {
- files = operations.listFiles();
+ List<FTPFile> files = null;
+ if (isUseList()) {
+ if (isStepwise()) {
+ files = operations.listFiles();
+ } else {
+ files = operations.listFiles(dir);
+ }
} else {
- files = operations.listFiles(dir);
+ // we cannot use the LIST command(s) so we can only poll a named file
+ // so created a pseudo file with that name
+ FTPFile file = new FTPFile();
+ file.setType(FTPFile.FILE_TYPE);
+ fileExpressionResult = evaluateFileExpression();
+ if (fileExpressionResult != null) {
+ file.setName(fileExpressionResult);
+ files = new ArrayList<FTPFile>(1);
+ files.add(file);
+ }
}
if (files == null || files.isEmpty()) {
@@ -149,6 +164,18 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
return false;
}
+ @Override
+ protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange) {
+ if (getEndpoint().getConfiguration().isIgnoreFileNotFound()) {
+ // error code 550 is file not found
+ int code = exchange.getIn().getHeader(FtpConstants.FTP_REPLY_CODE, 0, int.class);
+ if (code == 550) {
+ return true;
+ }
+ }
+ return super.ignoreCannotRetrieveFile(name, exchange);
+ }
+
private RemoteFile<FTPFile> asRemoteFile(String absolutePath, FTPFile file) {
RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>();
@@ -193,6 +220,11 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
return config.isStepwise();
}
+ private boolean isUseList() {
+ RemoteFileConfiguration config = (RemoteFileConfiguration) endpoint.getConfiguration();
+ return config.isUseList();
+ }
+
@Override
public String toString() {
return "FtpConsumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
index e5325ba..bbe0a18 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
@@ -33,7 +33,7 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
* Windows = Path separator \ is used
* Auto = Use existing path separator in file name
*/
- public enum PathSeparator { UNIX, Windows, Auto };
+ public enum PathSeparator { UNIX, Windows, Auto }
private String protocol;
private String username;
@@ -50,6 +50,8 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
private boolean stepwise = true;
private PathSeparator separator = PathSeparator.Auto;
private boolean streamDownload;
+ private boolean useList = true;
+ private boolean ignoreFileNotFound;
public RemoteFileConfiguration() {
}
@@ -270,13 +272,40 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
* Sets the download method to use when not using a local working directory. If set to true,
* the remote files are streamed to the route as they are read. When set to false, the remote files
* are loaded into memory before being sent into the route.
- *
- * @param streamDownload
*/
public void setStreamDownload(boolean streamDownload) {
this.streamDownload = streamDownload;
}
+ public boolean isUseList() {
+ return useList;
+ }
+
+ /**
+ * Whether to allow using LIST command when downloading a file.
+ * <p/>
+ * Default is <tt>true</tt>. In some use cases you may want to download
+ * a specific file and are not allowed to use the LIST command, and therefore
+ * you can set this option to <tt>false</tt>.
+ */
+ public void setUseList(boolean useList) {
+ this.useList = useList;
+ }
+
+ public boolean isIgnoreFileNotFound() {
+ return ignoreFileNotFound;
+ }
+
+ /**
+ * Whether to ignore when trying to download a file which does not exist.
+ * <p/>
+ * By default when a file does not exists, then an exception is thrown.
+ * Setting this option to <tt>true</tt> allows to ignore that instead.
+ */
+ public void setIgnoreFileNotFound(boolean ignoreFileNotFound) {
+ this.ignoreFileNotFound = ignoreFileNotFound;
+ }
+
/**
* Normalizes the given path according to the configured path separator.
*
http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index 39e23ce..de242da 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -92,11 +92,11 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
}
@Override
- protected void processExchange(Exchange exchange) {
+ protected boolean processExchange(Exchange exchange) {
// mark the exchange to be processed synchronously as the ftp client is not thread safe
// and we must execute the callbacks in the same thread as this consumer
exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE);
- super.processExchange(exchange);
+ return super.processExchange(exchange);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
index 1dbfd9d..765aac4 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.file.remote;
import java.util.List;
import com.jcraft.jsch.ChannelSftp;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.util.FileUtil;
http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
new file mode 100644
index 0000000..afd2763
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test to poll a fixed file from the FTP server without using the list command.
+ */
+public class FromFtpUseListFalseTest extends FtpServerTestSupport {
+
+ private String getFtpUrl() {
+ return "ftp://admin@localhost:" + getPort() + "/nolist/?password=admin"
+ + "&stepwise=false&useList=false&ignoreFileNotFound=true&fileName=report.txt&delete=true";
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ prepareFtpServer();
+ }
+
+ @Test
+ public void testUseListFalse() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World from FTPServer");
+
+ // just allow to poll a few more times, but we should only get the file once
+ Thread.sleep(1000);
+
+ mock.assertIsSatisfied();
+ }
+
+ private void prepareFtpServer() throws Exception {
+ // prepares the FTP Server by creating a file on the server that we want to unit
+ // test that we can pool and store as a local file
+ Endpoint endpoint = context.getEndpoint("ftp://admin@localhost:" + getPort() + "/nolist/?password=admin&binary=false");
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setBody("Hello World from FTPServer");
+ exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt");
+ Producer producer = endpoint.createProducer();
+ producer.start();
+ producer.process(exchange);
+ producer.stop();
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from(getFtpUrl()).to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d99d5940/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
new file mode 100644
index 0000000..6554e62
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test to poll a fixed file from the FTP server without using the list command.
+ */
+public class FtpConsumerTemplateUseListFalseTest extends FtpServerTestSupport {
+
+ private String getFtpUrl() {
+ return "ftp://admin@localhost:" + getPort() + "/nolist/?password=admin"
+ + "&stepwise=false&useList=false&ignoreFileNotFound=true&delete=true";
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ prepareFtpServer();
+ }
+
+ @Test
+ public void testUseListFalse() throws Exception {
+ String data = consumer.receiveBody(getFtpUrl() + "&fileName=report.txt", 5000, String.class);
+ assertEquals("Hello World from FTPServer", data);
+
+ // try a 2nd time and the file is deleted
+ data = consumer.receiveBody(getFtpUrl() + "&fileName=report.txt", 1000, String.class);
+ assertNull("The file should no longer exist", data);
+
+ // and try a non existing file name
+ data = consumer.receiveBody(getFtpUrl() + "&fileName=report2.txt", 1000, String.class);
+ assertNull("The file should no longer exist", data);
+ }
+
+ private void prepareFtpServer() throws Exception {
+ // prepares the FTP Server by creating a file on the server that we want to unit
+ // test that we can pool and store as a local file
+ Endpoint endpoint = context.getEndpoint("ftp://admin@localhost:" + getPort() + "/nolist/?password=admin&binary=false");
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setBody("Hello World from FTPServer");
+ exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt");
+ Producer producer = endpoint.createProducer();
+ producer.start();
+ producer.process(exchange);
+ producer.stop();
+ }
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+}
[2/2] git commit: CAMEL-6671: FTP consumer. Added useList and
ignoreFileNotFound options to allow to download a single file without using
FTP LIST command which is needed in some use-cases,
such as user has no permission to do FTP LIST.
Posted by da...@apache.org.
CAMEL-6671: FTP consumer. Added useList and ignoreFileNotFound options to allow to download a single file without using FTP LIST command which is needed in some use-cases, such as user has no permission to do FTP LIST.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c948c2b8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c948c2b8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c948c2b8
Branch: refs/heads/camel-2.12.x
Commit: c948c2b8d237c1b191ede44491160b12478c027b
Parents: b56b1c5
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Sep 5 12:44:16 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Sep 5 13:00:43 2013 +0200
----------------------------------------------------------------------
.../component/file/GenericFileConsumer.java | 64 +++++++++++++----
.../camel/impl/ScheduledPollConsumer.java | 1 +
.../component/file/remote/FtpConsumer.java | 40 +++++++++--
.../file/remote/RemoteFileConfiguration.java | 35 ++++++++-
.../file/remote/RemoteFileConsumer.java | 4 +-
.../component/file/remote/SftpConsumer.java | 1 +
.../file/remote/FromFtpUseListFalseTest.java | 75 ++++++++++++++++++++
.../FtpConsumerTemplateUseListFalseTest.java | 74 +++++++++++++++++++
8 files changed, 271 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 0f8be8e..c55755e 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -171,6 +171,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
public int processBatch(Queue<Object> exchanges) {
int total = exchanges.size();
+ int answer = total;
// limit if needed
if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
@@ -191,19 +192,25 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
pendingExchanges = total - index - 1;
// process the current exchange
+ boolean started;
if (customProcessor != null) {
// use a custom processor
- customProcessExchange(exchange, customProcessor);
+ started = customProcessExchange(exchange, customProcessor);
} else {
// process the exchange regular
- processExchange(exchange);
+ started = processExchange(exchange);
+ }
+
+ // if we did not start process the file then decremember the counter
+ if (!started) {
+ answer--;
}
}
// drain any in progress files as we are done with this batch
removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0);
- return total;
+ return answer;
}
protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
@@ -277,11 +284,28 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
}
/**
+ * Whether to ignore if the file cannot be retrieved.
+ * <p/>
+ * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved.
+ * <p/>
+ * This method allows to suppress this and just ignore that.
+ *
+ * @param name the file name
+ * @param exchange the exchange
+ * @return <tt>true</tt> to ignore, <tt>false</tt> is the default.
+ */
+ protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange) {
+ return false;
+ }
+
+ /**
* Processes the exchange
*
* @param exchange the exchange
+ * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started
+ * to be processed, for some reason (not found, or aborted etc)
*/
- protected void processExchange(final Exchange exchange) {
+ protected boolean processExchange(final Exchange exchange) {
GenericFile<T> file = getExchangeFileProperty(exchange);
log.trace("Processing file: {}", file);
@@ -303,7 +327,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
// begin returned false, so remove file from the in progress list as its no longer in progress
endpoint.getInProgressRepository().remove(absoluteFileName);
}
- return;
+ return false;
}
} catch (Exception e) {
// remove file from the in progress list due to failure
@@ -311,7 +335,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
String msg = endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage();
handleException(msg, e);
- return;
+ return false;
}
// must use file from exchange as it can be updated due the
@@ -328,10 +352,17 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
// retrieve the file and check it was a success
boolean retrieved = operations.retrieveFile(name, exchange);
if (!retrieved) {
- // throw exception to handle the problem with retrieving the file
- // then if the method return false or throws an exception is handled the same in here
- // as in both cases an exception is being thrown
- throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
+ if (ignoreCannotRetrieveFile(name, exchange)) {
+ log.trace("Cannot retrieve file {} maybe it does not exists. Ignorning.", name);
+ // remove file from the in progress list as we could not retrieve it, but should ignore
+ endpoint.getInProgressRepository().remove(absoluteFileName);
+ return false;
+ } else {
+ // throw exception to handle the problem with retrieving the file
+ // then if the method return false or throws an exception is handled the same in here
+ // as in both cases an exception is being thrown
+ throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
+ }
}
log.trace("Retrieved file: {} from: {}", name, endpoint);
@@ -368,6 +399,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
String msg = "Error processing file " + file + " due to " + e.getMessage();
handleException(msg, e);
}
+
+ return true;
}
/**
@@ -385,7 +418,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
* @param exchange the exchange
* @param processor the custom processor
*/
- protected void customProcessExchange(final Exchange exchange, final Processor processor) {
+ protected boolean customProcessExchange(final Exchange exchange, final Processor processor) {
GenericFile<T> file = getExchangeFileProperty(exchange);
log.trace("Custom processing file: {}", file);
@@ -407,6 +440,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
// as the name can be different when using preMove option
endpoint.getInProgressRepository().remove(absoluteFileName);
}
+
+ return true;
}
/**
@@ -508,7 +543,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
// use file expression for a simple dynamic file filter
if (endpoint.getFileName() != null) {
- evaluateFileExpression();
+ fileExpressionResult = evaluateFileExpression();
if (fileExpressionResult != null) {
if (!name.equals(fileExpressionResult)) {
return false;
@@ -557,12 +592,13 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
return !endpoint.getInProgressRepository().add(key);
}
- private void evaluateFileExpression() {
- if (fileExpressionResult == null) {
+ protected String evaluateFileExpression() {
+ if (fileExpressionResult == null && endpoint.getFileName() != null) {
// create a dummy exchange as Exchange is needed for expression evaluation
Exchange dummy = endpoint.createExchange();
fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
}
+ return fileExpressionResult;
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
index 6b34ac7..98b9e42 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
@@ -185,6 +185,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
if (begin) {
retryCounter++;
polledMessages = poll();
+ LOG.trace("Polled {} messages", polledMessages);
if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
// send an "empty" exchange
http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
index 7b042ea..5d7dca5 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
@@ -16,8 +16,10 @@
*/
package org.apache.camel.component.file.remote;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.util.FileUtil;
@@ -83,11 +85,24 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
}
log.trace("Polling directory: {}", dir);
- List<FTPFile> files;
- if (isStepwise()) {
- files = operations.listFiles();
+ List<FTPFile> files = null;
+ if (isUseList()) {
+ if (isStepwise()) {
+ files = operations.listFiles();
+ } else {
+ files = operations.listFiles(dir);
+ }
} else {
- files = operations.listFiles(dir);
+ // we cannot use the LIST command(s) so we can only poll a named file
+ // so created a pseudo file with that name
+ FTPFile file = new FTPFile();
+ file.setType(FTPFile.FILE_TYPE);
+ fileExpressionResult = evaluateFileExpression();
+ if (fileExpressionResult != null) {
+ file.setName(fileExpressionResult);
+ files = new ArrayList<FTPFile>(1);
+ files.add(file);
+ }
}
if (files == null || files.isEmpty()) {
@@ -149,6 +164,18 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
return false;
}
+ @Override
+ protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange) {
+ if (getEndpoint().getConfiguration().isIgnoreFileNotFound()) {
+ // error code 550 is file not found
+ int code = exchange.getIn().getHeader(FtpConstants.FTP_REPLY_CODE, 0, int.class);
+ if (code == 550) {
+ return true;
+ }
+ }
+ return super.ignoreCannotRetrieveFile(name, exchange);
+ }
+
private RemoteFile<FTPFile> asRemoteFile(String absolutePath, FTPFile file) {
RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>();
@@ -193,6 +220,11 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> {
return config.isStepwise();
}
+ private boolean isUseList() {
+ RemoteFileConfiguration config = (RemoteFileConfiguration) endpoint.getConfiguration();
+ return config.isUseList();
+ }
+
@Override
public String toString() {
return "FtpConsumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
index e5325ba..bbe0a18 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
@@ -33,7 +33,7 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
* Windows = Path separator \ is used
* Auto = Use existing path separator in file name
*/
- public enum PathSeparator { UNIX, Windows, Auto };
+ public enum PathSeparator { UNIX, Windows, Auto }
private String protocol;
private String username;
@@ -50,6 +50,8 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
private boolean stepwise = true;
private PathSeparator separator = PathSeparator.Auto;
private boolean streamDownload;
+ private boolean useList = true;
+ private boolean ignoreFileNotFound;
public RemoteFileConfiguration() {
}
@@ -270,13 +272,40 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration {
* Sets the download method to use when not using a local working directory. If set to true,
* the remote files are streamed to the route as they are read. When set to false, the remote files
* are loaded into memory before being sent into the route.
- *
- * @param streamDownload
*/
public void setStreamDownload(boolean streamDownload) {
this.streamDownload = streamDownload;
}
+ public boolean isUseList() {
+ return useList;
+ }
+
+ /**
+ * Whether to allow using LIST command when downloading a file.
+ * <p/>
+ * Default is <tt>true</tt>. In some use cases you may want to download
+ * a specific file and are not allowed to use the LIST command, and therefore
+ * you can set this option to <tt>false</tt>.
+ */
+ public void setUseList(boolean useList) {
+ this.useList = useList;
+ }
+
+ public boolean isIgnoreFileNotFound() {
+ return ignoreFileNotFound;
+ }
+
+ /**
+ * Whether to ignore when trying to download a file which does not exist.
+ * <p/>
+ * By default when a file does not exists, then an exception is thrown.
+ * Setting this option to <tt>true</tt> allows to ignore that instead.
+ */
+ public void setIgnoreFileNotFound(boolean ignoreFileNotFound) {
+ this.ignoreFileNotFound = ignoreFileNotFound;
+ }
+
/**
* Normalizes the given path according to the configured path separator.
*
http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index 39e23ce..de242da 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -92,11 +92,11 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
}
@Override
- protected void processExchange(Exchange exchange) {
+ protected boolean processExchange(Exchange exchange) {
// mark the exchange to be processed synchronously as the ftp client is not thread safe
// and we must execute the callbacks in the same thread as this consumer
exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE);
- super.processExchange(exchange);
+ return super.processExchange(exchange);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
index 1dbfd9d..765aac4 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.file.remote;
import java.util.List;
import com.jcraft.jsch.ChannelSftp;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.util.FileUtil;
http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
new file mode 100644
index 0000000..afd2763
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test to poll a fixed file from the FTP server without using the list command.
+ */
+public class FromFtpUseListFalseTest extends FtpServerTestSupport {
+
+ private String getFtpUrl() {
+ return "ftp://admin@localhost:" + getPort() + "/nolist/?password=admin"
+ + "&stepwise=false&useList=false&ignoreFileNotFound=true&fileName=report.txt&delete=true";
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ prepareFtpServer();
+ }
+
+ @Test
+ public void testUseListFalse() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World from FTPServer");
+
+ // just allow to poll a few more times, but we should only get the file once
+ Thread.sleep(1000);
+
+ mock.assertIsSatisfied();
+ }
+
+ private void prepareFtpServer() throws Exception {
+ // prepares the FTP Server by creating a file on the server that we want to unit
+ // test that we can pool and store as a local file
+ Endpoint endpoint = context.getEndpoint("ftp://admin@localhost:" + getPort() + "/nolist/?password=admin&binary=false");
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setBody("Hello World from FTPServer");
+ exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt");
+ Producer producer = endpoint.createProducer();
+ producer.start();
+ producer.process(exchange);
+ producer.stop();
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from(getFtpUrl()).to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
new file mode 100644
index 0000000..6554e62
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test to poll a fixed file from the FTP server without using the list command.
+ */
+public class FtpConsumerTemplateUseListFalseTest extends FtpServerTestSupport {
+
+ private String getFtpUrl() {
+ return "ftp://admin@localhost:" + getPort() + "/nolist/?password=admin"
+ + "&stepwise=false&useList=false&ignoreFileNotFound=true&delete=true";
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ prepareFtpServer();
+ }
+
+ @Test
+ public void testUseListFalse() throws Exception {
+ String data = consumer.receiveBody(getFtpUrl() + "&fileName=report.txt", 5000, String.class);
+ assertEquals("Hello World from FTPServer", data);
+
+ // try a 2nd time and the file is deleted
+ data = consumer.receiveBody(getFtpUrl() + "&fileName=report.txt", 1000, String.class);
+ assertNull("The file should no longer exist", data);
+
+ // and try a non existing file name
+ data = consumer.receiveBody(getFtpUrl() + "&fileName=report2.txt", 1000, String.class);
+ assertNull("The file should no longer exist", data);
+ }
+
+ private void prepareFtpServer() throws Exception {
+ // prepares the FTP Server by creating a file on the server that we want to unit
+ // test that we can pool and store as a local file
+ Endpoint endpoint = context.getEndpoint("ftp://admin@localhost:" + getPort() + "/nolist/?password=admin&binary=false");
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setBody("Hello World from FTPServer");
+ exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt");
+ Producer producer = endpoint.createProducer();
+ producer.start();
+ producer.process(exchange);
+ producer.stop();
+ }
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+}