You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/08/07 12:07:05 UTC
svn commit: r801928 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/file/
main/java/org/apache/camel/component/file/strategy/
test/java/org/apache/camel/component/file/
Author: davsclaus
Date: Fri Aug 7 10:07:05 2009
New Revision: 801928
URL: http://svn.apache.org/viewvc?rev=801928&view=rev
Log:
CAMEL-1887: Added option moveFailed to file component to move failed files to another location instead of leaving them untouched.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveAndMoveFailureTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveFailureTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=801928&r1=801927&r2=801928&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Fri Aug 7 10:07:05 2009
@@ -68,6 +68,7 @@
protected String exclude;
protected Expression fileName;
protected Expression move;
+ protected Expression moveFailed;
protected Expression preMove;
protected boolean idempotent;
protected IdempotentRepository idempotentRepository;
@@ -207,6 +208,23 @@
}
/**
+ * Sets the move failure expression based on
+ * {@link org.apache.camel.language.simple.FileLanguage}
+ */
+ public void setMoveFailed(String fileLanguageExpression) {
+ String expression = configureMoveOrPreMoveExpression(fileLanguageExpression);
+ this.moveFailed = createFileLangugeExpression(expression);
+ }
+
+ public Expression getMoveFailed() {
+ return moveFailed;
+ }
+
+ public void setMoveFailed(Expression moveFailed) {
+ this.moveFailed = moveFailed;
+ }
+
+ /**
* Sets the move expression based on
* {@link org.apache.camel.language.simple.FileLanguage}
*/
@@ -467,6 +485,9 @@
if (move != null) {
params.put("move", move);
}
+ if (moveFailed != null) {
+ params.put("moveFailed", moveFailed);
+ }
if (preMove != null) {
params.put("preMove", preMove);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java?rev=801928&r1=801927&r2=801928&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java Fri Aug 7 10:07:05 2009
@@ -133,8 +133,12 @@
*/
protected void processStrategyRollback(GenericFileProcessStrategy<T> processStrategy,
Exchange exchange, GenericFile<T> file) {
- if (log.isWarnEnabled()) {
- log.warn("Rolling back remote file strategy: " + processStrategy + " for file: " + file);
+
+ // only WARN in case we do not handle it ourself by moving failed files
+ if (endpoint.getMoveFailed() == null) {
+ if (log.isWarnEnabled()) {
+ log.warn("Rolling back remote file strategy: " + processStrategy + " for file: " + file);
+ }
}
try {
processStrategy.rollback(operations, endpoint, exchange, file);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=801928&r1=801927&r2=801928&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java Fri Aug 7 10:07:05 2009
@@ -34,10 +34,12 @@
public static GenericFileProcessStrategy createGenericFileProcessStrategy(CamelContext context, Map<String, Object> params) {
// We assume a value is present only if its value not null for String and 'true' for boolean
- boolean isNoop = params.get("noop") != null;
- boolean isDelete = params.get("delete") != null;
Expression moveExpression = (Expression) params.get("move");
+ Expression moveFailedExpression = (Expression) params.get("moveFailed");
Expression preMoveExpression = (Expression) params.get("preMove");
+ boolean isNoop = params.get("noop") != null;
+ boolean isDelete = params.get("delete") != null;
+ boolean isMove = moveExpression != null || preMoveExpression != null || moveFailedExpression != null;
if (isNoop) {
GenericFileNoOpProcessStrategy<File> strategy = new GenericFileNoOpProcessStrategy<File>();
@@ -47,32 +49,43 @@
GenericFileDeleteProcessStrategy<File> strategy = new GenericFileDeleteProcessStrategy<File>();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
return strategy;
- } else if (moveExpression != null || preMoveExpression != null) {
+ } else if (isMove) {
GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<File>();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
if (moveExpression != null) {
GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<File>();
renamer.setExpression(moveExpression);
strategy.setCommitRenamer(renamer);
+ } else {
+ strategy.setCommitRenamer(getDefaultCommitRenamer(context));
}
if (preMoveExpression != null) {
GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<File>();
renamer.setExpression(preMoveExpression);
strategy.setBeginRenamer(renamer);
}
+ if (moveFailedExpression != null) {
+ GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<File>();
+ renamer.setExpression(moveFailedExpression);
+ strategy.setFailureRenamer(renamer);
+ }
return strategy;
} else {
// default strategy will move files in a .camel/ subfolder where the file was consumed
GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<File>();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
- // use context to lookup language to let it be loose coupled
- Language language = context.resolveLanguage("file");
- Expression expression = language.createExpression("${file:parent}/.camel/${file:onlyname}");
- strategy.setCommitRenamer(new GenericFileExpressionRenamer<File>(expression));
+ strategy.setCommitRenamer(getDefaultCommitRenamer(context));
return strategy;
}
}
+ private static GenericFileExpressionRenamer<File> getDefaultCommitRenamer(CamelContext context) {
+ // use context to lookup language to let it be loose coupled
+ Language language = context.resolveLanguage("file");
+ Expression expression = language.createExpression("${file:parent}/.camel/${file:onlyname}");
+ return new GenericFileExpressionRenamer<File>(expression);
+ }
+
@SuppressWarnings("unchecked")
private static GenericFileExclusiveReadLockStrategy<File> getExclusiveReadLockStrategy(Map<String, Object> params) {
GenericFileExclusiveReadLockStrategy strategy = (GenericFileExclusiveReadLockStrategy) params.get("exclusiveReadLockStrategy");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java?rev=801928&r1=801927&r2=801928&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java Fri Aug 7 10:07:05 2009
@@ -33,10 +33,12 @@
public static GenericFileProcessStrategy createGenericFileProcessStrategy(CamelContext context, Map<String, Object> params) {
// We assume a value is present only if its value not null for String and 'true' for boolean
- boolean isNoop = params.get("noop") != null;
- boolean isDelete = params.get("delete") != null;
Expression moveExpression = (Expression) params.get("move");
+ Expression moveFailedExpression = (Expression) params.get("moveFailed");
Expression preMoveExpression = (Expression) params.get("preMove");
+ boolean isNoop = params.get("noop") != null;
+ boolean isDelete = params.get("delete") != null;
+ boolean isMove = moveExpression != null || preMoveExpression != null || moveFailedExpression != null;
if (isNoop) {
GenericFileNoOpProcessStrategy strategy = new GenericFileNoOpProcessStrategy();
@@ -46,7 +48,7 @@
GenericFileDeleteProcessStrategy strategy = new GenericFileDeleteProcessStrategy();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
return strategy;
- } else if (moveExpression != null || preMoveExpression != null) {
+ } else if (isMove) {
GenericFileRenameProcessStrategy strategy = new GenericFileRenameProcessStrategy();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
if (moveExpression != null) {
@@ -54,6 +56,11 @@
renamer.setExpression(moveExpression);
strategy.setCommitRenamer(renamer);
}
+ if (moveFailedExpression != null) {
+ GenericFileExpressionRenamer renamer = new GenericFileExpressionRenamer();
+ renamer.setExpression(moveFailedExpression);
+ strategy.setFailureRenamer(renamer);
+ }
if (preMoveExpression != null) {
GenericFileExpressionRenamer renamer = new GenericFileExpressionRenamer();
renamer.setExpression(preMoveExpression);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java?rev=801928&r1=801927&r2=801928&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java Fri Aug 7 10:07:05 2009
@@ -26,6 +26,7 @@
public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
private GenericFileRenamer<T> beginRenamer;
+ private GenericFileRenamer<T> failureRenamer;
private GenericFileRenamer<T> commitRenamer;
public GenericFileRenameProcessStrategy() {
@@ -51,6 +52,17 @@
}
@Override
+ public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
+ // must invoke super
+ super.rollback(operations, endpoint, exchange, file);
+
+ if (failureRenamer != null) {
+ GenericFile<T> newName = failureRenamer.renameFile(exchange, file);
+ renameFile(operations, file, newName);
+ }
+ }
+
+ @Override
public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
// must invoke super
super.commit(operations, endpoint, exchange, file);
@@ -103,4 +115,11 @@
this.commitRenamer = commitRenamer;
}
+ public GenericFileRenamer<T> getFailureRenamer() {
+ return failureRenamer;
+ }
+
+ public void setFailureRenamer(GenericFileRenamer<T> failureRenamer) {
+ this.failureRenamer = failureRenamer;
+ }
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveAndMoveFailureTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveAndMoveFailureTest.java?rev=801928&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveAndMoveFailureTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveAndMoveFailureTest.java Fri Aug 7 10:07:05 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class FileConsumerMoveAndMoveFailureTest extends ContextTestSupport {
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/failed");
+ super.setUp();
+ }
+
+ public void testMoveAndMoveFailed() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+
+ mock.expectedFileExists("target/failed/moved/hello.txt", "Hello World");
+ mock.expectedFileExists("target/failed/error/bye-error.txt", "Kabom");
+
+ template.sendBodyAndHeader("file://target/failed", "Hello World", Exchange.FILE_NAME, "hello.txt");
+ template.sendBodyAndHeader("file://target/failed", "Kabom", Exchange.FILE_NAME, "bye.txt");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("file://target/failed?move=moved&moveFailed=error/${file:name.noext}-error.txt")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String body = exchange.getIn().getBody(String.class);
+ if ("Kabom".equals(body)) {
+ throw new IllegalArgumentException("Forced");
+ }
+ }
+ }).to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveAndMoveFailureTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveAndMoveFailureTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveFailureTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveFailureTest.java?rev=801928&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveFailureTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveFailureTest.java Fri Aug 7 10:07:05 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class FileConsumerMoveFailureTest extends ContextTestSupport {
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/failed");
+ super.setUp();
+ }
+
+ public void testMoveFailed() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+
+ mock.expectedFileExists("target/failed/.camel/hello.txt", "Hello World");
+ mock.expectedFileExists("target/failed/error/bye-error.txt", "Kabom");
+
+ template.sendBodyAndHeader("file://target/failed", "Hello World", Exchange.FILE_NAME, "hello.txt");
+ template.sendBodyAndHeader("file://target/failed", "Kabom", Exchange.FILE_NAME, "bye.txt");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("file://target/failed?moveFailed=error/${file:name.noext}-error.txt")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String body = exchange.getIn().getBody(String.class);
+ if ("Kabom".equals(body)) {
+ throw new IllegalArgumentException("Forced");
+ }
+ }
+ }).to("mock:result");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveFailureTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMoveFailureTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date