You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/28 14:16:58 UTC
svn commit: r1140555 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/file/
components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/
Author: davsclaus
Date: Tue Jun 28 12:16:57 2011
New Revision: 1140555
URL: http://svn.apache.org/viewvc?rev=1140555&view=rev
Log:
CAMEL-3655: Reverted back of previous work on this as its not needed anymore.
Removed:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1140555&r1=1140554&r2=1140555&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Tue Jun 28 12:16:57 2011
@@ -42,9 +42,9 @@ import org.slf4j.LoggerFactory;
*/
public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
protected final transient Logger log = LoggerFactory.getLogger(getClass());
- protected final ProcessFile processFile;
protected GenericFileEndpoint<T> endpoint;
protected GenericFileOperations<T> operations;
+ protected boolean loggedIn;
protected String fileExpressionResult;
protected int maxMessagesPerPoll;
protected volatile ShutdownRunningTask shutdownRunningTask;
@@ -54,13 +54,6 @@ public abstract class GenericFileConsume
super(endpoint, processor);
this.endpoint = endpoint;
this.operations = operations;
- this.processFile = new ProcessFile(this);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public GenericFileEndpoint<T> getEndpoint() {
- return (GenericFileEndpoint<T>) super.getEndpoint();
}
/**
@@ -81,7 +74,7 @@ public abstract class GenericFileConsume
// gather list of files to process
List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
- String name = getEndpoint().getConfiguration().getDirectory();
+ String name = endpoint.getConfiguration().getDirectory();
// time how long time it takes to poll
StopWatch stop = new StopWatch();
@@ -97,21 +90,21 @@ public abstract class GenericFileConsume
}
// sort files using file comparator if provided
- if (getEndpoint().getSorter() != null) {
- Collections.sort(files, getEndpoint().getSorter());
+ if (endpoint.getSorter() != null) {
+ Collections.sort(files, endpoint.getSorter());
}
// sort using build in sorters so we can use expressions
LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
for (GenericFile<T> file : files) {
- Exchange exchange = getEndpoint().createExchange(file);
- getEndpoint().configureExchange(exchange);
- getEndpoint().configureMessage(file, exchange.getIn());
+ Exchange exchange = endpoint.createExchange(file);
+ endpoint.configureExchange(exchange);
+ endpoint.configureMessage(file, exchange.getIn());
exchanges.add(exchange);
}
// sort files using exchange comparator if provided
- if (getEndpoint().getSortBy() != null) {
- Collections.sort(exchanges, getEndpoint().getSortBy());
+ if (endpoint.getSortBy() != null) {
+ Collections.sort(exchanges, endpoint.getSortBy());
}
// consume files one by one
@@ -163,7 +156,7 @@ public abstract class GenericFileConsume
Exchange exchange = (Exchange) exchanges.poll();
GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
String key = file.getAbsoluteFilePath();
- getEndpoint().getInProgressRepository().remove(key);
+ endpoint.getInProgressRepository().remove(key);
}
return total;
@@ -258,22 +251,83 @@ public abstract class GenericFileConsume
}
/**
- * Gets the operations to be used
- *
- * @return the operations
- */
- public GenericFileOperations<T> getOperations() {
- return operations;
- }
-
- /**
* Processes the exchange
*
* @param exchange the exchange
*/
protected void processExchange(final Exchange exchange) {
- // let the process do the work
- processFile.processExchange(exchange);
+ GenericFile<T> file = getExchangeFileProperty(exchange);
+ log.trace("Processing file: {}", file);
+
+ // must extract the absolute name before the begin strategy as the file could potentially be pre moved
+ // and then the file name would be changed
+ String absoluteFileName = file.getAbsoluteFilePath();
+
+ // check if we can begin processing the file
+ try {
+ final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
+
+ boolean begin = processStrategy.begin(operations, endpoint, exchange, file);
+ if (!begin) {
+ log.debug(endpoint + " cannot begin processing file: {}", file);
+ // begin returned false, so remove file from the in progress list as its no longer in progress
+ endpoint.getInProgressRepository().remove(absoluteFileName);
+ return;
+ }
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug(endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage(), e);
+ }
+ endpoint.getInProgressRepository().remove(absoluteFileName);
+ return;
+ }
+
+ // must use file from exchange as it can be updated due the
+ // preMoveNamePrefix/preMoveNamePostfix options
+ final GenericFile<T> target = getExchangeFileProperty(exchange);
+ // must use full name when downloading so we have the correct path
+ final String name = target.getAbsoluteFilePath();
+ try {
+ // retrieve the file using the stream
+ log.trace("Retrieving file: {} from: {}", name, endpoint);
+
+ // retrieve the file and check it was a success
+ boolean retrieved = operations.retrieveFile(name, exchange);
+ if (!retrieved) {
+ // throw exception to handle the problem with retrieving the file
+ // then if the method return false or throws an exception is handled the same in here
+ // as in both cases an exception is being thrown
+ throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
+ }
+
+ log.trace("Retrieved file: {} from: {}", name, endpoint);
+
+ // register on completion callback that does the completion strategies
+ // (for instance to move the file after we have processed it)
+ exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
+
+ log.debug("About to process file: {} using exchange: {}", target, exchange);
+
+ // process the exchange using the async consumer to support async routing engine
+ // which can be supported by this file consumer as all the done work is
+ // provided in the GenericFileOnCompletion
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // noop
+ if (log.isTraceEnabled()) {
+ log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
+ }
+ }
+ });
+
+ } catch (Exception e) {
+ // remove file from the in progress list due to failure
+ // (cannot be in finally block due to GenericFileOnCompletion will remove it
+ // from in progress when it takes over and processes the file, which may happen
+ // by another thread at a later time. So its only safe to remove it if there was an exception)
+ endpoint.getInProgressRepository().remove(absoluteFileName);
+ handleException(e);
+ }
}
/**
@@ -287,7 +341,7 @@ public abstract class GenericFileConsume
if (!isMatched(file, isDirectory)) {
log.trace("File did not match. Will skip this file: {}", file);
return false;
- } else if (getEndpoint().isIdempotent() && getEndpoint().getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
+ } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", file);
return false;
}
@@ -328,26 +382,26 @@ public abstract class GenericFileConsume
return true;
}
- if (getEndpoint().getFilter() != null) {
- if (!getEndpoint().getFilter().accept(file)) {
+ if (endpoint.getFilter() != null) {
+ if (!endpoint.getFilter().accept(file)) {
return false;
}
}
- if (ObjectHelper.isNotEmpty(getEndpoint().getExclude())) {
- if (name.matches(getEndpoint().getExclude())) {
+ if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
+ if (name.matches(endpoint.getExclude())) {
return false;
}
}
- if (ObjectHelper.isNotEmpty(getEndpoint().getInclude())) {
- if (!name.matches(getEndpoint().getInclude())) {
+ if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
+ if (!name.matches(endpoint.getInclude())) {
return false;
}
}
// use file expression for a simple dynamic file filter
- if (getEndpoint().getFileName() != null) {
+ if (endpoint.getFileName() != null) {
evaluateFileExpression();
if (fileExpressionResult != null) {
if (!name.equals(fileExpressionResult)) {
@@ -357,19 +411,19 @@ public abstract class GenericFileConsume
}
// if done file name is enabled, then the file is only valid if a done file exists
- if (getEndpoint().getDoneFileName() != null) {
+ if (endpoint.getDoneFileName() != null) {
// done file must be in same path as the file
- String doneFileName = getEndpoint().createDoneFileName(file.getAbsoluteFilePath());
- ObjectHelper.notEmpty(doneFileName, "doneFileName", getEndpoint());
+ String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath());
+ ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
// is it a done file name?
- if (getEndpoint().isDoneFile(file.getFileNameOnly())) {
+ if (endpoint.isDoneFile(file.getFileNameOnly())) {
log.trace("Skipping done file: {}", file);
return false;
}
// the file is only valid if the done file exist
- if (!getOperations().existsFile(doneFileName)) {
+ if (!operations.existsFile(doneFileName)) {
log.trace("Done file: {} does not exist", doneFileName);
return false;
}
@@ -386,13 +440,13 @@ public abstract class GenericFileConsume
*/
protected boolean isInProgress(GenericFile<T> file) {
String key = file.getAbsoluteFilePath();
- return !getEndpoint().getInProgressRepository().add(key);
+ return !endpoint.getInProgressRepository().add(key);
}
private void evaluateFileExpression() {
if (fileExpressionResult == null) {
// create a dummy exchange as Exchange is needed for expression evaluation
- Exchange dummy = new DefaultExchange(getEndpoint().getCamelContext());
+ Exchange dummy = new DefaultExchange(endpoint.getCamelContext());
fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
}
}
@@ -407,35 +461,6 @@ public abstract class GenericFileConsume
super.doStart();
// prepare on startup
- getEndpoint().getGenericFileProcessStrategy().prepareOnStartup(getOperations(), getEndpoint());
+ endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint);
}
-
- /**
- * Class the processes the exchange when a file has been polled.
- */
- private class ProcessFile extends GenericFileConsumerSupport<T> {
-
- public ProcessFile(GenericFileConsumer<T> consumer) {
- super(consumer);
- }
-
- @Override
- void handleExceptionStrategy(Exception e) {
- // handle the exception on the consumer
- handleException(e);
- }
-
- @Override
- void processFileStrategy(Exchange exchange) {
- // process the exchange using the async consumer to support async routing engine
- // which can be supported by this file consumer as all the done work is
- // provided in the GenericFileOnCompletion
- getAsyncProcessor().process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // noop
- }
- });
- }
- }
-
}
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1140555&r1=1140554&r2=1140555&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Tue Jun 28 12:16:57 2011
@@ -39,7 +39,7 @@ public abstract class RemoteFileConsumer
return (RemoteFileEndpoint<T>) super.getEndpoint();
}
- public RemoteFileOperations getOperations() {
+ protected RemoteFileOperations getOperations() {
return (RemoteFileOperations) operations;
}