You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/21 08:08:06 UTC
svn commit: r936165 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/file/GenericFileConsumer.java
test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
Author: davsclaus
Date: Wed Apr 21 06:08:05 2010
New Revision: 936165
URL: http://svn.apache.org/viewvc?rev=936165&view=rev
Log:
CAMEL-2662: Fixed issue if file begin threw exception which cause in progress file not to be removed from internal cache
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=936165&r1=936164&r2=936165&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Wed Apr 21 06:08:05 2010
@@ -218,6 +218,8 @@ public abstract class GenericFileConsume
// must extract the absolute name before the begin strategy as the file could potentially be pre moved
// and then the file name would be changed
String absoluteFileName = file.getAbsoluteFilePath();
+
+ // check if we can begin processing the file
try {
final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
@@ -226,17 +228,24 @@ public abstract class GenericFileConsume
if (log.isDebugEnabled()) {
log.debug(endpoint + " cannot begin processing file: " + file);
}
- // remove file from the in progress list as its no longer in progress
+ // begin returned false, so remove file from the in progress list as its no longer in progress
endpoint.getInProgressRepository().remove(absoluteFileName);
return;
}
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug(endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage(), e);
+ }
+ endpoint.getInProgressRepository().remove(absoluteFileName);
+ return;
+ }
- // must use file from exchange as it can be updated due the
- // preMoveNamePrefix/preMoveNamePostfix options
- final GenericFile<T> target = getExchangeFileProperty(exchange);
- // must use full name when downloading so we have the correct path
- final String name = target.getAbsoluteFilePath();
-
+ // must use file from exchange as it can be updated due the
+ // preMoveNamePrefix/preMoveNamePostfix options
+ final GenericFile<T> target = getExchangeFileProperty(exchange);
+ // must use full name when downloading so we have the correct path
+ final String name = target.getAbsoluteFilePath();
+ try {
// retrieve the file using the stream
if (log.isTraceEnabled()) {
log.trace("Retrieving file: " + name + " from: " + endpoint);
@@ -248,18 +257,23 @@ public abstract class GenericFileConsume
log.trace("Retrieved file: " + name + " from: " + endpoint);
}
- if (log.isDebugEnabled()) {
- log.debug("About to process file: " + target + " using exchange: " + exchange);
- }
-
// register on completion callback that does the completion strategies
// (for instance to move the file after we have processed it)
exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
+ if (log.isDebugEnabled()) {
+ log.debug("About to process file: " + target + " using exchange: " + exchange);
+ }
+
// process the exchange
getProcessor().process(exchange);
} catch (Exception e) {
+ // remove file from the in progress list due to failure
+ // (cannot be in finally block due to GenericFileOnCompletion will remove it
+ // from in progress when it takes over and processes the file, which may happen
+ // by another thread at a later time. So its only safe to remove it if there was an exception)
+ endpoint.getInProgressRepository().remove(absoluteFileName);
handleException(e);
}
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java?rev=936165&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java Wed Apr 21 06:08:05 2010
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+
+/**
+ * @version $Revision$
+ */
+public class FileBeginFailureOneTimeTest extends ContextTestSupport {
+
+ private MyStrategy myStrategy = new MyStrategy();
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/begin");
+ super.setUp();
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myStrategy", myStrategy);
+ return jndi;
+ }
+
+ public void testBeginFailureOneTime() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ template.sendBodyAndHeader("file://target/begin", "Hello World", Exchange.FILE_NAME, "hello.txt");
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("Begin should have been invoked 2 times", 2, myStrategy.getInvoked());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("file://target/begin?processStrategy=#myStrategy")
+ .convertBodyTo(String.class)
+ .to("mock:result");
+ }
+ };
+ }
+
+ private class MyStrategy implements GenericFileProcessStrategy<File> {
+
+ private int invoked;
+
+ public void prepareOnStartup(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint) throws Exception {
+ }
+
+ public boolean begin(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint, Exchange exchange, GenericFile<File> fileGenericFile) throws Exception {
+ invoked++;
+ if (invoked <= 1) {
+ throw new IllegalArgumentException("Damn I cannot do this");
+ }
+ return true;
+ }
+
+ public void commit(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint, Exchange exchange, GenericFile<File> fileGenericFile) throws Exception {
+ }
+
+ public void rollback(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint, Exchange exchange, GenericFile<File> fileGenericFile) throws Exception {
+ }
+
+ public int getInvoked() {
+ return invoked;
+ }
+ }
+
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date