You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/05/25 14:20:59 UTC

svn commit: r659974 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/ main/java/org/apache/camel/processor/exceptionpolicy/ test/java/org/apache/camel/component/file/

Author: davsclaus
Date: Sun May 25 05:20:58 2008
New Revision: 659974

URL: http://svn.apache.org/viewvc?rev=659974&view=rev
Log:
CAMEL-548: The file strategy will commit if the exchange was handled by the failure processor (DeadLetterChannel) otherwise Camel will have infinite lock on files that was after all still processed somehow

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerFailureHandledTest.java   (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategy.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=659974&r1=659973&r2=659974&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Sun May 25 05:20:58 2008
@@ -21,6 +21,7 @@
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Processor;
+import org.apache.camel.processor.DeadLetterChannel;
 import org.apache.camel.impl.ScheduledPollConsumer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -104,8 +105,7 @@
         if (!isValidFile(file)) {
             return 0;
         }
-        // we only care about file modified times if we are not deleting/moving
-        // files
+        // we only care about file modified times if we are not deleting/moving files
         if (!endpoint.isNoop()) {
             if (filesBeingProcessed.contains(file)) {
                 return 1;
@@ -127,15 +127,21 @@
                 // the exchange can happen asynchronously
                 getAsyncProcessor().process(exchange, new AsyncCallback() {
                     public void done(boolean sync) {
-                        if (exchange.getException() == null) {
-                            try {
-                                processStrategy.commit(endpoint, exchange, file);
-                            } catch (Exception e) {
-                                handleException(e);
-                            }
-                        } else {
+                        boolean failed = exchange.isFailed();
+                        boolean handled = DeadLetterChannel.isFailureHandled(exchange);
+
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Done processing file: " + file + ". Status is: " + (failed ? "failed: " + failed + ", handled by failure processor: " + handled : "OK"));
+                        }
+
+                        if (!failed || handled) {
+                            // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
+                            processStrategyCommit(processStrategy, exchange, file, handled);
+                        } else if (failed && !handled) {
+                            // there was an exception but it was not handled by the DeadLetterChannel
                             handleException(exchange.getException());
                         }
+
                         filesBeingProcessed.remove(file);
                     }
                 });
@@ -148,9 +154,33 @@
         } catch (Throwable e) {
             handleException(e);
         }
+
         return 1;
     }
 
+    /**
+     * Strategy when the file was processed and a commit should be executed.
+     *
+     * @param processStrategy   the strategy to perform the commit
+     * @param exchange          the exchange
+     * @param file              the file processed
+     * @param failureHandled    is <tt>false</tt> if the exchange was processed succesfully, <tt>true</tt> if
+     * an exception occured during processing but it was handled by the failure processor (usually the
+     * DeadLetterChannel).
+     */
+    protected void processStrategyCommit(FileProcessStrategy processStrategy, FileExchange exchange,
+                                         File file, boolean failureHandled) {
+        try {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Committing file strategy: " + processStrategy + " for file: " + file + (failureHandled ? " that was handled by the failure processor." : ""));
+            }
+            processStrategy.commit(endpoint, exchange, file);
+        } catch (Exception e) {
+            LOG.warn("Error committing file strategy: " + processStrategy, e);
+            handleException(e);
+        }
+    }
+
     protected boolean isValidFile(File file) {
         boolean result = false;
         if (file != null && file.exists()) {
@@ -162,7 +192,6 @@
         return result;
     }
 
-
     protected boolean isChanged(File file) {
         if (file == null) {
             // Sanity check

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategy.java?rev=659974&r1=659973&r2=659974&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategy.java Sun May 25 05:20:58 2008
@@ -42,6 +42,10 @@
 
     public ExceptionType getExceptionPolicy(Map<Class, ExceptionType> exceptionPolicices, Exchange exchange,
                                             Throwable exception) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Finding best suited exception policy for thrown exception " + exception.getClass().getName());
+        }
+
         // the goal is to find the exception with the same/closet inheritance level as the target exception being thrown
         int targetLevel = getInheritanceLevel(exception.getClass());
         // candidate is the best candidate found so far to return
@@ -77,10 +81,14 @@
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Using " + candidate + " as the exception policy");
+            if (candidate != null) {
+                LOG.debug("Using " + candidate + " as the exception policy");
+            } else {
+                LOG.debug("No candidate found to be used as exception policy");
+            }
         }
-        return candidate;
 
+        return candidate;
     }
 
     private static int getInheritanceLevel(Class clazz) {

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerFailureHandledTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerFailureHandledTest.java?rev=659974&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerFailureHandledTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerFailureHandledTest.java Sun May 25 05:20:58 2008
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.ValidationException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.file.strategy.FileProcessStrategySupport;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for consuming files but the exchange fails and is handled
+ * by the failure handler (usually the DeadLetterChannel)
+ */
+public class FileConsumerFailureHandledTest extends ContextTestSupport {
+
+    public void testParis() throws Exception {
+        deleteDirectory("target/messages");
+
+        MockEndpoint mock = getMockEndpoint("mock:valid");
+        mock.expectedBodiesReceived("Hello Paris");
+
+        template.sendBodyAndHeader("file:target/messages/input/?delete=true", "Paris", FileComponent.HEADER_FILE_NAME, "paris.txt");
+        mock.assertIsSatisfied();
+
+        // sleep otherwise the file assertions below could fail
+        Thread.sleep(500);
+
+        asserFiles("paris.txt");
+    }
+
+    public void testLondon() throws Exception {
+        deleteDirectory("target/messages");
+
+        MockEndpoint mock = getMockEndpoint("mock:invalid");
+        // we get the original input so its not Hello London but only London
+        mock.expectedBodiesReceived("London");
+
+        template.sendBodyAndHeader("file:target/messages/input/?delete=true", "London", FileComponent.HEADER_FILE_NAME, "london.txt");
+        mock.assertIsSatisfied();
+
+        // sleep otherwise the file assertions below could fail
+        Thread.sleep(500);
+
+        asserFiles("london.txt");
+    }
+    
+    public void testMadrid() throws Exception {
+        deleteDirectory("target/messages");
+
+        MockEndpoint mock = getMockEndpoint("mock:error");
+        // we get the original input so its not Hello London but only London
+        mock.expectedBodiesReceived("Madrid");
+
+        template.sendBodyAndHeader("file:target/messages/input/?delete=true", "Madrid", FileComponent.HEADER_FILE_NAME, "madrid.txt");
+        mock.assertIsSatisfied();
+
+        // sleep otherwise the file assertions below could fail
+        Thread.sleep(500);
+
+        asserFiles("madrid.txt");
+    }
+
+    private static void asserFiles(String filename) {
+        // file should be deleted as deleted=true in parameter in the route below
+        File file = new File("target/messages/input/" + filename);
+        assertEquals("File " + filename + " should be deleted", false, file.exists());
+
+        // and no lock files
+        file = new File("target/messages/input/" + filename + FileProcessStrategySupport.DEFAULT_LOCK_FILE_POSTFIX);
+        assertEquals("File " + filename + " lock should be deleted", false, file.exists());
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // make sure mock:error is the dead letter channel
+                errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2));
+
+                // special failure handler for ValidationException
+                exception(ValidationException.class).to("mock:invalid");
+
+                // our route logic to process files from the input folder
+                from("file:target/messages/input/?delete=true").
+                    process(new MyValidatorProcessor()).
+                    to("mock:valid");
+            }
+        };
+    }
+
+    private class MyValidatorProcessor implements Processor {
+        public void process(Exchange exchange) throws Exception {
+            String body = exchange.getIn().getBody(String.class);
+            if ("London".equals(body)) {
+                throw new ValidationException(exchange, "Forced exception by unit test");
+            } else if ("Madrid".equals(body)) {
+                throw new RuntimeCamelException("Madrid is not a supported city");
+            }
+            exchange.getOut().setBody("Hello " + body);
+        }
+    }
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerFailureHandledTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerFailureHandledTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date