You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2008/08/15 02:05:49 UTC

svn commit: r686094 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/ main/java/org/apache/camel/component/file/strategy/ main/java/org/apache/camel/impl/ test/java/org/apache/camel/component/file/

Author: hadrian
Date: Thu Aug 14 17:05:48 2008
New Revision: 686094

URL: http://svn.apache.org/viewvc?rev=686094&view=rev
Log:
CAMEL-826.  Added rollback method to FileProcessStrategy.
Fix for null exception passed to handleException.


Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteExceptionTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileProcessStrategy.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/NoOpFileProcessStrategy.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileFilterOnNameRouteTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpRouteTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Thu Aug 14 17:05:48 2008
@@ -158,15 +158,22 @@
                             LOG.debug("Done processing file: " + file + ". Status is: " + (failed ? "failed: " + failed + ", handled by failure processor: " + handled : "processed OK"));
                         }
 
-                        if (!failed || handled) {
-                            // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
-                            processStrategyCommit(processStrategy, exchange, file, handled);
-                        } else if (failed && !handled) {
-                            // there was an exception but it was not handled by the DeadLetterChannel
-                            handleException(exchange.getException());
+                        boolean committed = false;
+                        try {
+                            if (!failed || handled) {
+                                // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
+                                processStrategyCommit(processStrategy, exchange, file, handled);
+                                committed = true;
+                            } else {
+                                // there was an exception but it was not handled by the DeadLetterChannel
+                                handleException(exchange.getException());
+                            }
+                        } finally {
+                            if (!committed) {
+                                processStrategyRollback(processStrategy, exchange, file);
+                            }
+                            filesBeingProcessed.remove(file);
                         }
-
-                        filesBeingProcessed.remove(file);
                     }
                 });
 
@@ -230,6 +237,20 @@
         }
     }
 
+    /**
+     * Strategy when the file was not processed and a rollback should be executed.
+     *
+     * @param processStrategy   the strategy to perform the commit
+     * @param exchange          the exchange
+     * @param file              the file processed
+     */
+    protected void processStrategyRollback(FileProcessStrategy processStrategy, FileExchange exchange, File file) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Rolling back file strategy: " + processStrategy + " for file: " + file);
+        }
+        processStrategy.rollback(endpoint, exchange, file);
+    }
+
     protected boolean isValidFile(File file) {
         boolean result = false;
         if (file != null && file.exists()) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileProcessStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileProcessStrategy.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileProcessStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileProcessStrategy.java Thu Aug 14 17:05:48 2008
@@ -38,7 +38,7 @@
     boolean begin(FileEndpoint endpoint, FileExchange exchange, File file) throws Exception;
 
     /**
-     * Releases any file locks and possibly deletes or moves the file
+     * Releases any file locks and possibly deletes or moves the file after successful processing
      *
      * @param endpoint  the endpoint
      * @param exchange  the exchange
@@ -47,4 +47,13 @@
      */
     void commit(FileEndpoint endpoint, FileExchange exchange, File file) throws Exception;
     
+    /**
+     * Releases any file locks and possibly deletes or moves the file after unsuccessful processing
+     *
+     * @param endpoint  the endpoint
+     * @param exchange  the exchange
+     * @param file      the file
+     */
+    void rollback(FileEndpoint endpoint, FileExchange exchange, File file);
+    
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java Thu Aug 14 17:05:48 2008
@@ -74,18 +74,17 @@
     }
 
     public void commit(FileEndpoint endpoint, FileExchange exchange, File file) throws Exception {
-        if (isLockFile()) {
-            Channel channel = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.fileChannel", Channel.class);
-            String lockfile = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.file.lock.name", String.class);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Unlocking file: " + file);
-            }
-            channel.close();
-            File lock = new File(lockfile);
-            lock.delete();
-        }
+        unlockFile(endpoint, exchange, file);
     }
 
+	public void rollback(FileEndpoint endpoint, FileExchange exchange, File file) {
+        try {
+            unlockFile(endpoint, exchange, file);
+        } catch (Exception e) {
+            LOG.info("Unable to unlock file: " + file + ": " + e.getMessage(), e);
+        }
+	}
+
     public boolean isLockFile() {
         return lockFile;
     }
@@ -101,4 +100,17 @@
     public void setLockFileRenamer(FileRenamer lockFileRenamer) {
         this.lockFileRenamer = lockFileRenamer;
     }
+    
+    protected void unlockFile(FileEndpoint endpoint, FileExchange exchange, File file) throws Exception {
+        if (isLockFile()) {
+            Channel channel = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.fileChannel", Channel.class);
+            String lockfile = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.file.lock.name", String.class);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Unlocking file: " + file);
+            }
+            channel.close();
+            File lock = new File(lockfile);
+            lock.delete();
+        }
+    }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/NoOpFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/NoOpFileProcessStrategy.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/NoOpFileProcessStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/NoOpFileProcessStrategy.java Thu Aug 14 17:05:48 2008
@@ -30,5 +30,4 @@
     public NoOpFileProcessStrategy(boolean isLock) {
         super(isLock);
     }
-
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Thu Aug 14 17:05:48 2008
@@ -91,6 +91,7 @@
      * @param t the exception to handle
      */
     protected void handleException(Throwable t) {
-        getExceptionHandler().handleException(t);
+        Throwable newt = (t == null) ? new Throwable("Handling [null] exception") : t;
+        getExceptionHandler().handleException(newt);
     }
 }

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteExceptionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteExceptionTest.java?rev=686094&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteExceptionTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteExceptionTest.java Thu Aug 14 17:05:48 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ValidationException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision: 683664 $
+ */
+public class FileDeleteRouteExceptionTest extends FileDeleteRouteTest {
+    @Override
+    protected void setUp() throws Exception {
+        targetdir = "target/test-delete-inbox";
+        params = "?consumer.delay=1000&delete=true&consumer.recursive=true";
+        super.setUp();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).to("mock:result").throwFault("Exception while processing file...").to("mock:skip");
+            }
+        };
+    }
+}
\ No newline at end of file

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java Thu Aug 14 17:05:48 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.file;
 
+import java.io.File;
+
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
@@ -24,8 +26,8 @@
 public class FileDeleteRouteTest extends FileRouteTest {
     @Override
     protected void setUp() throws Exception {
-        uri = "file:target/test-delete-inbox?consumer.delay=1000&delete=true&consumer.recursive=true";
-        deleteDirectory("target/test-delete-inbox");
+    	targetdir = "target/test-delete-inbox";
+    	params = "?consumer.delay=1000&delete=true&consumer.recursive=true";
         super.setUp();
     }
 
@@ -36,10 +38,14 @@
         result.setResultWaitTime(5000);
 
         template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
-
         Thread.sleep(4000);
 
         result.assertIsSatisfied();
+        
+        for (String lockName: recorder.getLocks()) {
+        	File lock = new File(lockName);
+        	assertFalse(lock.exists());
+        }
     }
 
 }
\ No newline at end of file

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileFilterOnNameRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileFilterOnNameRouteTest.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileFilterOnNameRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileFilterOnNameRouteTest.java Thu Aug 14 17:05:48 2008
@@ -27,12 +27,7 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                
-                // more natural
                 from(uri).filter(header(FileComponent.HEADER_FILE_NAME).contains("-")).to("mock:result");
-                
-                // than
-                //from(uri).filter(header(FileComponent.HEADER_FILE_NAME).matchesRegex(".*-.*")).to("mock:result");
             }
         };
     }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpRouteTest.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpRouteTest.java Thu Aug 14 17:05:48 2008
@@ -22,8 +22,8 @@
 public class FileNoOpRouteTest extends FileRouteTest {
     @Override
     protected void setUp() throws Exception {
-        uri = "file:target/test-noop-inbox?noop=true&consumer.recursive=true";
-        deleteDirectory("target/test-noop-inbox");
+        targetdir = "target/test-noop-inbox";
+        params = "?noop=true&consumer.recursive=true";
         super.setUp();
     }
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java Thu Aug 14 17:05:48 2008
@@ -22,8 +22,8 @@
 public class FileRenameRouteTest extends FileRouteTest {
     @Override
     protected void setUp() throws Exception {
-        deleteDirectory("target/test-rename-inbox");
-        uri = "file:target/test-rename-inbox?moveNamePrefix=foo/?consumer.recursive=true";
+        targetdir = "target/test-rename-inbox";
+        params = "?moveNamePrefix=foo/?consumer.recursive=true";
         super.setUp();
     }
 }
\ No newline at end of file

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java Thu Aug 14 17:05:48 2008
@@ -16,7 +16,11 @@
  */
 package org.apache.camel.component.file;
 
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
@@ -25,7 +29,10 @@
  */
 public class FileRouteTest extends ContextTestSupport {
     protected Object expectedBody = "Hello there!";
-    protected String uri = "file:target/test-default-inbox?consumer.recursive=true";
+    protected String targetdir = "target/test-default-inbox";
+    protected String params = "?consumer.recursive=true";
+    protected String uri = "file:" + targetdir + params;
+    protected LockRecorderProcessor recorder = new LockRecorderProcessor();
 
     public void testFileRoute() throws Exception {
         MockEndpoint result = getMockEndpoint("mock:result");
@@ -39,7 +46,8 @@
 
     @Override
     protected void setUp() throws Exception {
-        deleteDirectory("target/test-default-inbox");
+        deleteDirectory(targetdir);
+        uri = "file:" + targetdir + params;
         super.setUp();
     }
 
@@ -47,8 +55,20 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from(uri).to("mock:result");
+                from(uri).process(recorder).to("mock:result");
             }
         };
     }
+    
+    public class LockRecorderProcessor implements Processor {
+        private ConcurrentLinkedQueue<String> locks = new ConcurrentLinkedQueue<String>();
+        
+        public ConcurrentLinkedQueue<String> getLocks() {
+            return locks;
+        }
+    	
+        public void process(Exchange exchange) {
+            locks.add(exchange.getProperty("org.apache.camel.file.lock.name", String.class));
+        }
+    }
 }