You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/21 08:08:06 UTC

svn commit: r936165 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/GenericFileConsumer.java test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java

Author: davsclaus
Date: Wed Apr 21 06:08:05 2010
New Revision: 936165

URL: http://svn.apache.org/viewvc?rev=936165&view=rev
Log:
CAMEL-2662: Fixed issue if file begin threw exception which cause in progress file not to be removed from internal cache

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=936165&r1=936164&r2=936165&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Wed Apr 21 06:08:05 2010
@@ -218,6 +218,8 @@ public abstract class GenericFileConsume
         // must extract the absolute name before the begin strategy as the file could potentially be pre moved
         // and then the file name would be changed
         String absoluteFileName = file.getAbsoluteFilePath();
+
+        // check if we can begin processing the file
         try {
             final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
 
@@ -226,17 +228,24 @@ public abstract class GenericFileConsume
                 if (log.isDebugEnabled()) {
                     log.debug(endpoint + " cannot begin processing file: " + file);
                 }
-                // remove file from the in progress list as its no longer in progress
+                // begin returned false, so remove file from the in progress list as its no longer in progress
                 endpoint.getInProgressRepository().remove(absoluteFileName);
                 return;
             }
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug(endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage(), e);
+            }
+            endpoint.getInProgressRepository().remove(absoluteFileName);
+            return;
+        }
 
-            // must use file from exchange as it can be updated due the
-            // preMoveNamePrefix/preMoveNamePostfix options
-            final GenericFile<T> target = getExchangeFileProperty(exchange);
-            // must use full name when downloading so we have the correct path
-            final String name = target.getAbsoluteFilePath();
-
+        // must use file from exchange as it can be updated due the
+        // preMoveNamePrefix/preMoveNamePostfix options
+        final GenericFile<T> target = getExchangeFileProperty(exchange);
+        // must use full name when downloading so we have the correct path
+        final String name = target.getAbsoluteFilePath();
+        try {
             // retrieve the file using the stream
             if (log.isTraceEnabled()) {
                 log.trace("Retrieving file: " + name + " from: " + endpoint);
@@ -248,18 +257,23 @@ public abstract class GenericFileConsume
                 log.trace("Retrieved file: " + name + " from: " + endpoint);
             }
 
-            if (log.isDebugEnabled()) {
-                log.debug("About to process file: " + target + " using exchange: " + exchange);
-            }
-
             // register on completion callback that does the completion strategies
             // (for instance to move the file after we have processed it)
             exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
 
+            if (log.isDebugEnabled()) {
+                log.debug("About to process file: " + target + " using exchange: " + exchange);
+            }
+
             // process the exchange
             getProcessor().process(exchange);
 
         } catch (Exception e) {
+            // remove file from the in progress list due to failure
+            // (cannot be in finally block due to GenericFileOnCompletion will remove it
+            // from in progress when it takes over and processes the file, which may happen
+            // by another thread at a later time. So its only safe to remove it if there was an exception)
+            endpoint.getInProgressRepository().remove(absoluteFileName);
             handleException(e);
         }
     }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java?rev=936165&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java Wed Apr 21 06:08:05 2010
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+
+/**
+ * @version $Revision$
+ */
+public class FileBeginFailureOneTimeTest extends ContextTestSupport {
+
+    private MyStrategy myStrategy = new MyStrategy();
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/begin");
+        super.setUp();
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myStrategy", myStrategy);
+        return jndi;
+    }
+
+    public void testBeginFailureOneTime() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("file://target/begin", "Hello World", Exchange.FILE_NAME, "hello.txt");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("Begin should have been invoked 2 times", 2, myStrategy.getInvoked());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file://target/begin?processStrategy=#myStrategy")
+                    .convertBodyTo(String.class)
+                    .to("mock:result");
+            }
+        };
+    }
+
+    private class MyStrategy implements GenericFileProcessStrategy<File> {
+
+        private int invoked;
+
+        public void prepareOnStartup(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint) throws Exception {
+        }
+
+        public boolean begin(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint, Exchange exchange, GenericFile<File> fileGenericFile) throws Exception {
+            invoked++;
+            if (invoked <= 1) {
+                throw new IllegalArgumentException("Damn I cannot do this");
+            }
+            return true;
+        }
+
+        public void commit(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint, Exchange exchange, GenericFile<File> fileGenericFile) throws Exception {
+        }
+
+        public void rollback(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint, Exchange exchange, GenericFile<File> fileGenericFile) throws Exception {
+        }
+
+        public int getInvoked() {
+            return invoked;
+        }
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date