You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/08/10 09:45:39 UTC

svn commit: r802679 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/component/file/

Author: davsclaus
Date: Mon Aug 10 07:45:38 2009
New Revision: 802679

URL: http://svn.apache.org/viewvc?rev=802679&view=rev
Log:
CAMEL-1895: poll enrich from a file based endpoint is currently not supported if the starting endpoint is also file based.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java
      - copied, changed from r802661, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=802679&r1=802678&r2=802679&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Mon Aug 10 07:45:38 2009
@@ -192,7 +192,7 @@
 
             // register on completion callback that does the completiom stategies
             // (for instance to move the file after we have processed it)
-            exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations));
+            exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target));
 
             // process the exchange
             getProcessor().process(exchange);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java?rev=802679&r1=802678&r2=802679&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java Mon Aug 10 07:45:38 2009
@@ -37,10 +37,12 @@
     private GenericFileEndpoint<T> endpoint;
     private GenericFileOperations<T> operations;
     private ExceptionHandler exceptionHandler;
+    private GenericFile<T> file;
 
-    public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) {
+    public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations, GenericFile<T> file) {
         this.endpoint = endpoint;
         this.operations = operations;
+        this.file = file;
     }
 
     @SuppressWarnings("unchecked")
@@ -67,10 +69,6 @@
     protected void onCompletion(Exchange exchange) {
         GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
 
-        // after processing
-        final GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
-        boolean failed = exchange.isFailed();
-
         if (log.isDebugEnabled()) {
             log.debug("Done processing file: " + file + " using exchange: " + exchange);
         }
@@ -78,6 +76,7 @@
         // commit or rollback
         boolean committed = false;
         try {
+            boolean failed = exchange.isFailed();
             if (!failed) {
                 // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
                 processStrategyCommit(processStrategy, exchange, file);
@@ -116,7 +115,7 @@
 
         try {
             if (log.isTraceEnabled()) {
-                log.trace("Committing remote file strategy: " + processStrategy + " for file: " + file);
+                log.trace("Commit file strategy: " + processStrategy + " for file: " + file);
             }
             processStrategy.commit(operations, endpoint, exchange, file);
         } catch (Exception e) {
@@ -134,11 +133,8 @@
     protected void processStrategyRollback(GenericFileProcessStrategy<T> processStrategy,
                                            Exchange exchange, GenericFile<T> file) {
 
-        // only WARN in case we do not handle it ourself by moving failed files
-        if (endpoint.getMoveFailed() == null) {
-            if (log.isWarnEnabled()) {
-                log.warn("Rolling back remote file strategy: " + processStrategy + " for file: " + file);
-            }
+        if (log.isWarnEnabled()) {
+            log.warn("Rollback file strategy: " + processStrategy + " for file: " + file);
         }
         try {
             processStrategy.rollback(operations, endpoint, exchange, file);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=802679&r1=802678&r2=802679&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java Mon Aug 10 07:45:38 2009
@@ -20,7 +20,7 @@
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EventDrivenPollingConsumer;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.util.ExchangeHelper;
@@ -111,6 +111,8 @@
      * @param exchange input data.
      */
     public void process(Exchange exchange) throws Exception {
+        preChceckPoll(exchange);
+
         Exchange resourceExchange;
         if (timeout < 0) {
             if (LOG.isDebugEnabled()) {
@@ -156,6 +158,28 @@
     }
 
     /**
+     * Strategy to pre check polling.
+     * <p/>
+     * Is currently used to prevent doing poll enrich from a file based endpoint when the current route also
+     * started from a file based endpoint as that is not currently supported.
+     *
+     * @param exchange the current exchange
+     */
+    protected void preChceckPoll(Exchange exchange) throws Exception {
+        // cannot poll a file endpoint if already consuming from a file endpoint (CAMEL-1895)
+        if (consumer instanceof EventDrivenPollingConsumer) {
+            EventDrivenPollingConsumer edpc = (EventDrivenPollingConsumer) consumer;
+            boolean fileBasedConsumer = edpc.getEndpoint().getEndpointKey().startsWith("file") || edpc.getEndpoint().getEndpointKey().startsWith("ftp");
+            boolean fileBasedExchange = exchange.getFromEndpoint().getEndpointUri().startsWith("file") || exchange.getFromEndpoint().getEndpointUri().startsWith("ftp");
+            if (fileBasedConsumer && fileBasedExchange) {
+                throw new IllegalArgumentException("Camel durrently does not support pollEnrich from a file/ftp endpoint"
+                        + " when the route also started from a file/ftp endpoint."
+                        + " Started from: " + exchange.getFromEndpoint().getEndpointUri() + " pollEnrich: " + edpc.getEndpoint().getEndpointUri());
+            }
+        }
+    }
+
+    /**
      * Creates a new {@link org.apache.camel.impl.DefaultExchange} instance from the given
      * <code>exchange</code>. The resulting exchange's pattern is defined by
      * <code>pattern</code>.

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java (from r802661, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java&r1=802661&r2=802679&rev=802679&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java Mon Aug 10 07:45:38 2009
@@ -16,19 +16,15 @@
  */
 package org.apache.camel.component.file;
 
-import org.apache.camel.CamelExchangeException;
-import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.util.FileUtil;
 
 /**
  * @version $Revision$
  */
-public class FileConsumePollEnrichFileUsingProcessorTest extends ContextTestSupport {
+public class FileConsumePollEnrichFileTest extends ContextTestSupport {
 
     @Override
     protected void setUp() throws Exception {
@@ -37,7 +33,9 @@
         super.setUp();
     }
 
-    public void testPollEnrich() throws Exception {
+
+    // TODO: CAMEL-1895
+    public void xxxTestPollEnrich() throws Exception {
         getMockEndpoint("mock:start").expectedBodiesReceived("Start");
 
         MockEndpoint mock = getMockEndpoint("mock:result");
@@ -45,50 +43,28 @@
 
         mock.expectedFileExists("target/enrich/.done/AAA.fin");
         mock.expectedFileExists("target/enrichdata/.done/AAA.dat");
-        mock.expectedFileExists("target/enrichdata/BBB.dat");
 
         template.sendBodyAndHeader("file://target/enrichdata", "Big file", Exchange.FILE_NAME, "AAA.dat");
-        template.sendBodyAndHeader("file://target/enrichdata", "Other Big file", Exchange.FILE_NAME, "BBB.dat");
         template.sendBodyAndHeader("file://target/enrich", "Start", Exchange.FILE_NAME, "AAA.fin");
 
         assertMockEndpointsSatisfied();
     }
 
+    public void testNothing() {
+        //
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("file://target/enrich?move=.done")
-                    .process(new Processor() {
-                        public void process(Exchange exchange) throws Exception {
-                            String name = exchange.getIn().getHeader(Exchange.FILE_NAME_ONLY, String.class);
-                            name = FileUtil.stripExt(name) + ".dat";
-
-                            // use a consumer template to get the data file
-                            Exchange data = null;
-                            ConsumerTemplate con = exchange.getContext().createConsumerTemplate();
-                            try {
-                                // try to get the data file
-                                data = con.receive("file://target/enrichdata?move=.done&fileName=" + name, 5000);
-                            } finally {
-                                // stop the consumer as it does not need to poll for files anymore
-                                con.stop();
-                            }
-
-                            // if we found the data file then process it by sending it to the direct:data endpoint
-                            if (data != null) {
-                                template.send("direct:data", data);
-                            } else {
-                                // otherwise do a rollback
-                                throw new CamelExchangeException("Cannot find the data file " + name, exchange);
-                            }
-                        }
-                    }).to("mock:start");
-
-                from("direct:data")
+                from("file://target/enrich?move=.done&readLock=none")
+                    .to("mock:start")
+                    .pollEnrich("file://target/enrichdata?move=.done&readLock=none")
                     .to("mock:result");
             }
         };
     }
-}
+
+}
\ No newline at end of file