You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2008/08/15 02:05:49 UTC
svn commit: r686094 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/component/file/
main/java/org/apache/camel/component/file/strategy/
main/java/org/apache/camel/impl/ test/java/org/apache/camel/component/file/
Author: hadrian
Date: Thu Aug 14 17:05:48 2008
New Revision: 686094
URL: http://svn.apache.org/viewvc?rev=686094&view=rev
Log:
CAMEL-826. Added rollback method to FileProcessStrategy.
Fix for null exception passed to handleException.
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteExceptionTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileProcessStrategy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/NoOpFileProcessStrategy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileFilterOnNameRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Thu Aug 14 17:05:48 2008
@@ -158,15 +158,22 @@
LOG.debug("Done processing file: " + file + ". Status is: " + (failed ? "failed: " + failed + ", handled by failure processor: " + handled : "processed OK"));
}
- if (!failed || handled) {
- // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
- processStrategyCommit(processStrategy, exchange, file, handled);
- } else if (failed && !handled) {
- // there was an exception but it was not handled by the DeadLetterChannel
- handleException(exchange.getException());
+ boolean committed = false;
+ try {
+ if (!failed || handled) {
+ // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
+ processStrategyCommit(processStrategy, exchange, file, handled);
+ committed = true;
+ } else {
+ // there was an exception but it was not handled by the DeadLetterChannel
+ handleException(exchange.getException());
+ }
+ } finally {
+ if (!committed) {
+ processStrategyRollback(processStrategy, exchange, file);
+ }
+ filesBeingProcessed.remove(file);
}
-
- filesBeingProcessed.remove(file);
}
});
@@ -230,6 +237,20 @@
}
}
+ /**
+ * Strategy when the file was not processed and a rollback should be executed.
+ *
+ * @param processStrategy the strategy to perform the commit
+ * @param exchange the exchange
+ * @param file the file processed
+ */
+ protected void processStrategyRollback(FileProcessStrategy processStrategy, FileExchange exchange, File file) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Rolling back file strategy: " + processStrategy + " for file: " + file);
+ }
+ processStrategy.rollback(endpoint, exchange, file);
+ }
+
protected boolean isValidFile(File file) {
boolean result = false;
if (file != null && file.exists()) {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileProcessStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileProcessStrategy.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileProcessStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileProcessStrategy.java Thu Aug 14 17:05:48 2008
@@ -38,7 +38,7 @@
boolean begin(FileEndpoint endpoint, FileExchange exchange, File file) throws Exception;
/**
- * Releases any file locks and possibly deletes or moves the file
+ * Releases any file locks and possibly deletes or moves the file after successful processing
*
* @param endpoint the endpoint
* @param exchange the exchange
@@ -47,4 +47,13 @@
*/
void commit(FileEndpoint endpoint, FileExchange exchange, File file) throws Exception;
+ /**
+ * Releases any file locks and possibly deletes or moves the file after unsuccessful processing
+ *
+ * @param endpoint the endpoint
+ * @param exchange the exchange
+ * @param file the file
+ */
+ void rollback(FileEndpoint endpoint, FileExchange exchange, File file);
+
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java Thu Aug 14 17:05:48 2008
@@ -74,18 +74,17 @@
}
public void commit(FileEndpoint endpoint, FileExchange exchange, File file) throws Exception {
- if (isLockFile()) {
- Channel channel = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.fileChannel", Channel.class);
- String lockfile = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.file.lock.name", String.class);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unlocking file: " + file);
- }
- channel.close();
- File lock = new File(lockfile);
- lock.delete();
- }
+ unlockFile(endpoint, exchange, file);
}
+ public void rollback(FileEndpoint endpoint, FileExchange exchange, File file) {
+ try {
+ unlockFile(endpoint, exchange, file);
+ } catch (Exception e) {
+ LOG.info("Unable to unlock file: " + file + ": " + e.getMessage(), e);
+ }
+ }
+
public boolean isLockFile() {
return lockFile;
}
@@ -101,4 +100,17 @@
public void setLockFileRenamer(FileRenamer lockFileRenamer) {
this.lockFileRenamer = lockFileRenamer;
}
+
+ protected void unlockFile(FileEndpoint endpoint, FileExchange exchange, File file) throws Exception {
+ if (isLockFile()) {
+ Channel channel = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.fileChannel", Channel.class);
+ String lockfile = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.file.lock.name", String.class);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unlocking file: " + file);
+ }
+ channel.close();
+ File lock = new File(lockfile);
+ lock.delete();
+ }
+ }
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/NoOpFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/NoOpFileProcessStrategy.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/NoOpFileProcessStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/NoOpFileProcessStrategy.java Thu Aug 14 17:05:48 2008
@@ -30,5 +30,4 @@
public NoOpFileProcessStrategy(boolean isLock) {
super(isLock);
}
-
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Thu Aug 14 17:05:48 2008
@@ -91,6 +91,7 @@
* @param t the exception to handle
*/
protected void handleException(Throwable t) {
- getExceptionHandler().handleException(t);
+ Throwable newt = (t == null) ? new Throwable("Handling [null] exception") : t;
+ getExceptionHandler().handleException(newt);
}
}
Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteExceptionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteExceptionTest.java?rev=686094&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteExceptionTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteExceptionTest.java Thu Aug 14 17:05:48 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ValidationException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision: 683664 $
+ */
+public class FileDeleteRouteExceptionTest extends FileDeleteRouteTest {
+ @Override
+ protected void setUp() throws Exception {
+ targetdir = "target/test-delete-inbox";
+ params = "?consumer.delay=1000&delete=true&consumer.recursive=true";
+ super.setUp();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(uri).to("mock:result").throwFault("Exception while processing file...").to("mock:skip");
+ }
+ };
+ }
+}
\ No newline at end of file
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java Thu Aug 14 17:05:48 2008
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.file;
+import java.io.File;
+
import org.apache.camel.component.mock.MockEndpoint;
/**
@@ -24,8 +26,8 @@
public class FileDeleteRouteTest extends FileRouteTest {
@Override
protected void setUp() throws Exception {
- uri = "file:target/test-delete-inbox?consumer.delay=1000&delete=true&consumer.recursive=true";
- deleteDirectory("target/test-delete-inbox");
+ targetdir = "target/test-delete-inbox";
+ params = "?consumer.delay=1000&delete=true&consumer.recursive=true";
super.setUp();
}
@@ -36,10 +38,14 @@
result.setResultWaitTime(5000);
template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
-
Thread.sleep(4000);
result.assertIsSatisfied();
+
+ for (String lockName: recorder.getLocks()) {
+ File lock = new File(lockName);
+ assertFalse(lock.exists());
+ }
}
}
\ No newline at end of file
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileFilterOnNameRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileFilterOnNameRouteTest.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileFilterOnNameRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileFilterOnNameRouteTest.java Thu Aug 14 17:05:48 2008
@@ -27,12 +27,7 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
-
- // more natural
from(uri).filter(header(FileComponent.HEADER_FILE_NAME).contains("-")).to("mock:result");
-
- // than
- //from(uri).filter(header(FileComponent.HEADER_FILE_NAME).matchesRegex(".*-.*")).to("mock:result");
}
};
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpRouteTest.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpRouteTest.java Thu Aug 14 17:05:48 2008
@@ -22,8 +22,8 @@
public class FileNoOpRouteTest extends FileRouteTest {
@Override
protected void setUp() throws Exception {
- uri = "file:target/test-noop-inbox?noop=true&consumer.recursive=true";
- deleteDirectory("target/test-noop-inbox");
+ targetdir = "target/test-noop-inbox";
+ params = "?noop=true&consumer.recursive=true";
super.setUp();
}
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java Thu Aug 14 17:05:48 2008
@@ -22,8 +22,8 @@
public class FileRenameRouteTest extends FileRouteTest {
@Override
protected void setUp() throws Exception {
- deleteDirectory("target/test-rename-inbox");
- uri = "file:target/test-rename-inbox?moveNamePrefix=foo/?consumer.recursive=true";
+ targetdir = "target/test-rename-inbox";
+ params = "?moveNamePrefix=foo/?consumer.recursive=true";
super.setUp();
}
}
\ No newline at end of file
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java?rev=686094&r1=686093&r2=686094&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java Thu Aug 14 17:05:48 2008
@@ -16,7 +16,11 @@
*/
package org.apache.camel.component.file;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -25,7 +29,10 @@
*/
public class FileRouteTest extends ContextTestSupport {
protected Object expectedBody = "Hello there!";
- protected String uri = "file:target/test-default-inbox?consumer.recursive=true";
+ protected String targetdir = "target/test-default-inbox";
+ protected String params = "?consumer.recursive=true";
+ protected String uri = "file:" + targetdir + params;
+ protected LockRecorderProcessor recorder = new LockRecorderProcessor();
public void testFileRoute() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
@@ -39,7 +46,8 @@
@Override
protected void setUp() throws Exception {
- deleteDirectory("target/test-default-inbox");
+ deleteDirectory(targetdir);
+ uri = "file:" + targetdir + params;
super.setUp();
}
@@ -47,8 +55,20 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from(uri).to("mock:result");
+ from(uri).process(recorder).to("mock:result");
}
};
}
+
+ public class LockRecorderProcessor implements Processor {
+ private ConcurrentLinkedQueue<String> locks = new ConcurrentLinkedQueue<String>();
+
+ public ConcurrentLinkedQueue<String> getLocks() {
+ return locks;
+ }
+
+ public void process(Exchange exchange) {
+ locks.add(exchange.getProperty("org.apache.camel.file.lock.name", String.class));
+ }
+ }
}