You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/28 14:16:58 UTC

svn commit: r1140555 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/

Author: davsclaus
Date: Tue Jun 28 12:16:57 2011
New Revision: 1140555

URL: http://svn.apache.org/viewvc?rev=1140555&view=rev
Log:
CAMEL-3655: Reverted back of previous work on this as its not needed anymore.

Removed:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1140555&r1=1140554&r2=1140555&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Tue Jun 28 12:16:57 2011
@@ -42,9 +42,9 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
     protected final transient Logger log = LoggerFactory.getLogger(getClass());
-    protected final ProcessFile processFile;
     protected GenericFileEndpoint<T> endpoint;
     protected GenericFileOperations<T> operations;
+    protected boolean loggedIn;
     protected String fileExpressionResult;
     protected int maxMessagesPerPoll;
     protected volatile ShutdownRunningTask shutdownRunningTask;
@@ -54,13 +54,6 @@ public abstract class GenericFileConsume
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.operations = operations;
-        this.processFile = new ProcessFile(this);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public GenericFileEndpoint<T> getEndpoint() {
-        return (GenericFileEndpoint<T>) super.getEndpoint();
     }
 
     /**
@@ -81,7 +74,7 @@ public abstract class GenericFileConsume
 
         // gather list of files to process
         List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
-        String name = getEndpoint().getConfiguration().getDirectory();
+        String name = endpoint.getConfiguration().getDirectory();
 
         // time how long time it takes to poll
         StopWatch stop = new StopWatch();
@@ -97,21 +90,21 @@ public abstract class GenericFileConsume
         }
 
         // sort files using file comparator if provided
-        if (getEndpoint().getSorter() != null) {
-            Collections.sort(files, getEndpoint().getSorter());
+        if (endpoint.getSorter() != null) {
+            Collections.sort(files, endpoint.getSorter());
         }
 
         // sort using build in sorters so we can use expressions
         LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
         for (GenericFile<T> file : files) {
-            Exchange exchange = getEndpoint().createExchange(file);
-            getEndpoint().configureExchange(exchange);
-            getEndpoint().configureMessage(file, exchange.getIn());
+            Exchange exchange = endpoint.createExchange(file);
+            endpoint.configureExchange(exchange);
+            endpoint.configureMessage(file, exchange.getIn());
             exchanges.add(exchange);
         }
         // sort files using exchange comparator if provided
-        if (getEndpoint().getSortBy() != null) {
-            Collections.sort(exchanges, getEndpoint().getSortBy());
+        if (endpoint.getSortBy() != null) {
+            Collections.sort(exchanges, endpoint.getSortBy());
         }
 
         // consume files one by one
@@ -163,7 +156,7 @@ public abstract class GenericFileConsume
             Exchange exchange = (Exchange) exchanges.poll();
             GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
             String key = file.getAbsoluteFilePath();
-            getEndpoint().getInProgressRepository().remove(key);
+            endpoint.getInProgressRepository().remove(key);
         }
 
         return total;
@@ -258,22 +251,83 @@ public abstract class GenericFileConsume
     }
 
     /**
-     * Gets the operations to be used
-     *
-     * @return the operations
-     */
-    public GenericFileOperations<T> getOperations() {
-        return operations;
-    }
-
-    /**
      * Processes the exchange
      *
      * @param exchange the exchange
      */
     protected void processExchange(final Exchange exchange) {
-        // let the process do the work
-        processFile.processExchange(exchange);
+        GenericFile<T> file = getExchangeFileProperty(exchange);
+        log.trace("Processing file: {}", file);
+
+        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
+        // and then the file name would be changed
+        String absoluteFileName = file.getAbsoluteFilePath();
+
+        // check if we can begin processing the file
+        try {
+            final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
+
+            boolean begin = processStrategy.begin(operations, endpoint, exchange, file);
+            if (!begin) {
+                log.debug(endpoint + " cannot begin processing file: {}", file);
+                // begin returned false, so remove file from the in progress list as its no longer in progress
+                endpoint.getInProgressRepository().remove(absoluteFileName);
+                return;
+            }
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug(endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage(), e);
+            }
+            endpoint.getInProgressRepository().remove(absoluteFileName);
+            return;
+        }
+
+        // must use file from exchange as it can be updated due the
+        // preMoveNamePrefix/preMoveNamePostfix options
+        final GenericFile<T> target = getExchangeFileProperty(exchange);
+        // must use full name when downloading so we have the correct path
+        final String name = target.getAbsoluteFilePath();
+        try {
+            // retrieve the file using the stream
+            log.trace("Retrieving file: {} from: {}", name, endpoint);
+
+            // retrieve the file and check it was a success
+            boolean retrieved = operations.retrieveFile(name, exchange);
+            if (!retrieved) {
+                // throw exception to handle the problem with retrieving the file
+                // then if the method return false or throws an exception is handled the same in here
+                // as in both cases an exception is being thrown
+                throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
+            }
+
+            log.trace("Retrieved file: {} from: {}", name, endpoint);
+
+            // register on completion callback that does the completion strategies
+            // (for instance to move the file after we have processed it)
+            exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
+
+            log.debug("About to process file: {} using exchange: {}", target, exchange);
+
+            // process the exchange using the async consumer to support async routing engine
+            // which can be supported by this file consumer as all the done work is
+            // provided in the GenericFileOnCompletion
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    // noop
+                    if (log.isTraceEnabled()) {
+                        log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
+                    }
+                }
+            });
+
+        } catch (Exception e) {
+            // remove file from the in progress list due to failure
+            // (cannot be in finally block due to GenericFileOnCompletion will remove it
+            // from in progress when it takes over and processes the file, which may happen
+            // by another thread at a later time. So its only safe to remove it if there was an exception)
+            endpoint.getInProgressRepository().remove(absoluteFileName);
+            handleException(e);
+        }
     }
 
     /**
@@ -287,7 +341,7 @@ public abstract class GenericFileConsume
         if (!isMatched(file, isDirectory)) {
             log.trace("File did not match. Will skip this file: {}", file);
             return false;
-        } else if (getEndpoint().isIdempotent() && getEndpoint().getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
+        } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
             log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", file);
             return false;
         }
@@ -328,26 +382,26 @@ public abstract class GenericFileConsume
             return true;
         }
 
-        if (getEndpoint().getFilter() != null) {
-            if (!getEndpoint().getFilter().accept(file)) {
+        if (endpoint.getFilter() != null) {
+            if (!endpoint.getFilter().accept(file)) {
                 return false;
             }
         }
 
-        if (ObjectHelper.isNotEmpty(getEndpoint().getExclude())) {
-            if (name.matches(getEndpoint().getExclude())) {
+        if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
+            if (name.matches(endpoint.getExclude())) {
                 return false;
             }
         }
 
-        if (ObjectHelper.isNotEmpty(getEndpoint().getInclude())) {
-            if (!name.matches(getEndpoint().getInclude())) {
+        if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
+            if (!name.matches(endpoint.getInclude())) {
                 return false;
             }
         }
 
         // use file expression for a simple dynamic file filter
-        if (getEndpoint().getFileName() != null) {
+        if (endpoint.getFileName() != null) {
             evaluateFileExpression();
             if (fileExpressionResult != null) {
                 if (!name.equals(fileExpressionResult)) {
@@ -357,19 +411,19 @@ public abstract class GenericFileConsume
         }
 
         // if done file name is enabled, then the file is only valid if a done file exists
-        if (getEndpoint().getDoneFileName() != null) {
+        if (endpoint.getDoneFileName() != null) {
             // done file must be in same path as the file
-            String doneFileName = getEndpoint().createDoneFileName(file.getAbsoluteFilePath());
-            ObjectHelper.notEmpty(doneFileName, "doneFileName", getEndpoint());
+            String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath());
+            ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
 
             // is it a done file name?
-            if (getEndpoint().isDoneFile(file.getFileNameOnly())) {
+            if (endpoint.isDoneFile(file.getFileNameOnly())) {
                 log.trace("Skipping done file: {}", file);
                 return false;
             }
 
             // the file is only valid if the done file exist
-            if (!getOperations().existsFile(doneFileName)) {
+            if (!operations.existsFile(doneFileName)) {
                 log.trace("Done file: {} does not exist", doneFileName);
                 return false;
             }
@@ -386,13 +440,13 @@ public abstract class GenericFileConsume
      */
     protected boolean isInProgress(GenericFile<T> file) {
         String key = file.getAbsoluteFilePath();
-        return !getEndpoint().getInProgressRepository().add(key);
+        return !endpoint.getInProgressRepository().add(key);
     }
 
     private void evaluateFileExpression() {
         if (fileExpressionResult == null) {
             // create a dummy exchange as Exchange is needed for expression evaluation
-            Exchange dummy = new DefaultExchange(getEndpoint().getCamelContext());
+            Exchange dummy = new DefaultExchange(endpoint.getCamelContext());
             fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
         }
     }
@@ -407,35 +461,6 @@ public abstract class GenericFileConsume
         super.doStart();
         
         // prepare on startup
-        getEndpoint().getGenericFileProcessStrategy().prepareOnStartup(getOperations(), getEndpoint());
+        endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint);
     }
-
-    /**
-     * Class the processes the exchange when a file has been polled.
-     */
-    private class ProcessFile extends GenericFileConsumerSupport<T> {
-
-        public ProcessFile(GenericFileConsumer<T> consumer) {
-            super(consumer);
-        }
-
-        @Override
-        void handleExceptionStrategy(Exception e) {
-            // handle the exception on the consumer
-            handleException(e);
-        }
-
-        @Override
-        void processFileStrategy(Exchange exchange) {
-            // process the exchange using the async consumer to support async routing engine
-            // which can be supported by this file consumer as all the done work is
-            // provided in the GenericFileOnCompletion
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                public void done(boolean doneSync) {
-                    // noop
-                }
-            });
-        }
-    }
-
 }

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1140555&r1=1140554&r2=1140555&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Tue Jun 28 12:16:57 2011
@@ -39,7 +39,7 @@ public abstract class RemoteFileConsumer
         return (RemoteFileEndpoint<T>) super.getEndpoint();
     }
 
-    public RemoteFileOperations getOperations() {
+    protected RemoteFileOperations getOperations() {
         return (RemoteFileOperations) operations;
     }