You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/09/28 13:22:09 UTC
svn commit: r1176825 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/idempotent/
camel-core/src/test/java/org/apache/camel/processor/
components/camel-spring/src/test/java/org/apac...
Author: davsclaus
Date: Wed Sep 28 11:22:08 2011
New Revision: 1176825
URL: http://svn.apache.org/viewvc?rev=1176825&view=rev
Log:
CAMEL-4496: Added removeOnFailure option to idempotent consumer EIP. Thanks to Ioannis for the patch.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRemoveOnFailureTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java?rev=1176825&r1=1176824&r2=1176825&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java Wed Sep 28 11:22:08 2011
@@ -31,8 +31,6 @@ import org.apache.camel.util.ObjectHelpe
/**
* Represents an XML <idempotentConsumer/> element
- *
- * @version
*/
@XmlRootElement(name = "idempotentConsumer")
@XmlAccessorType(XmlAccessType.FIELD)
@@ -43,6 +41,8 @@ public class IdempotentConsumerDefinitio
private Boolean eager;
@XmlAttribute
private Boolean skipDuplicate;
+ @XmlAttribute
+ private Boolean removeOnFailure;
@XmlTransient
private IdempotentRepository<?> idempotentRepository;
@@ -58,7 +58,7 @@ public class IdempotentConsumerDefinitio
public String toString() {
return "IdempotentConsumer[" + getExpression() + " -> " + getOutputs() + "]";
}
-
+
@Override
public String getLabel() {
return "idempotentConsumer[" + getExpression() + "]";
@@ -68,25 +68,25 @@ public class IdempotentConsumerDefinitio
public String getShortName() {
return "idempotentConsumer";
}
-
+
// Fluent API
//-------------------------------------------------------------------------
/**
* Sets the reference name of the message id repository
*
- * @param messageIdRepositoryRef the reference name of message id repository
+ * @param messageIdRepositoryRef the reference name of message id repository
* @return builder
*/
public IdempotentConsumerDefinition messageIdRepositoryRef(String messageIdRepositoryRef) {
setMessageIdRepositoryRef(messageIdRepositoryRef);
return this;
}
-
+
/**
* Sets the the message id repository for the idempotent consumer
*
- * @param idempotentRepository the repository instance of idempotent
+ * @param idempotentRepository the repository instance of idempotent
* @return builder
*/
public IdempotentConsumerDefinition messageIdRepository(IdempotentRepository<?> idempotentRepository) {
@@ -98,8 +98,8 @@ public class IdempotentConsumerDefinitio
* Sets whether to eagerly add the key to the idempotent repository or wait until the exchange
* is complete. Eager is default enabled.
*
- * @param eager <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until
- * the exchange is complete.
+ * @param eager <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until
+ * the exchange is complete.
* @return builder
*/
public IdempotentConsumerDefinition eager(boolean eager) {
@@ -108,6 +108,20 @@ public class IdempotentConsumerDefinitio
}
/**
+ * Sets whether to remove or keep the key on failure.
+ * <p/>
+ * The default behavior is to remove the key on failure.
+ *
+ * @param removeOnFailure <tt>true</tt> to remove the key, <tt>false</tt> to keep the key
+ * if the exchange fails.
+ * @return builder
+ */
+ public IdempotentConsumerDefinition removeOnFailure(boolean removeOnFailure) {
+ setRemoveOnFailure(removeOnFailure);
+ return this;
+ }
+
+ /**
* Sets whether to skip duplicates or not.
* <p/>
* The default behavior is to skip duplicates.
@@ -115,7 +129,7 @@ public class IdempotentConsumerDefinitio
* A duplicate message would have the Exchange property {@link org.apache.camel.Exchange#DUPLICATE_MESSAGE} set
* to a {@link Boolean#TRUE} value. A none duplicate message will not have this property set.
*
- * @param skipDuplicate <tt>true</tt> to skip duplicates, <tt>false</tt> to allow duplicates.
+ * @param skipDuplicate <tt>true</tt> to skip duplicates, <tt>false</tt> to allow duplicates.
* @return builder
*/
public IdempotentConsumerDefinition skipDuplicate(boolean skipDuplicate) {
@@ -165,13 +179,27 @@ public class IdempotentConsumerDefinitio
return skipDuplicate != null ? skipDuplicate : true;
}
+ public Boolean getRemoveOnFailure() {
+ return removeOnFailure;
+ }
+
+ public void setRemoveOnFailure(Boolean removeOnFailure) {
+ this.removeOnFailure = removeOnFailure;
+ }
+
+ public boolean isRemoveOnFailure() {
+ // defaults to true if not configured
+ return removeOnFailure != null ? removeOnFailure : true;
+ }
+
+
@Override
@SuppressWarnings("unchecked")
public Processor createProcessor(RouteContext routeContext) throws Exception {
Processor childProcessor = this.createChildProcessor(routeContext, true);
IdempotentRepository<String> idempotentRepository =
- (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
+ (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
// add as service to CamelContext so we can managed it and it ensures it will be shutdown when camel shutdowns
@@ -179,13 +207,13 @@ public class IdempotentConsumerDefinitio
Expression expression = getExpression().createExpression(routeContext);
- return new IdempotentConsumer(expression, idempotentRepository, isEager(), isSkipDuplicate(), childProcessor);
+ return new IdempotentConsumer(expression, idempotentRepository, isEager(), isSkipDuplicate(), isRemoveOnFailure(), childProcessor);
}
/**
* Strategy method to resolve the {@link org.apache.camel.spi.IdempotentRepository} to use
*
- * @param routeContext route context
+ * @param routeContext route context
* @return the repository
*/
protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=1176825&r1=1176824&r2=1176825&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Wed Sep 28 11:22:08 2011
@@ -36,8 +36,6 @@ import org.slf4j.LoggerFactory;
/**
* An implementation of the <a
* href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern.
- *
- * @version
*/
public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor, Navigate<Processor> {
private static final transient Logger LOG = LoggerFactory.getLogger(IdempotentConsumer.class);
@@ -46,13 +44,15 @@ public class IdempotentConsumer extends
private final IdempotentRepository<String> idempotentRepository;
private final boolean eager;
private final boolean skipDuplicate;
+ private final boolean removeOnFailure;
public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository,
- boolean eager, boolean skipDuplicate, Processor processor) {
+ boolean eager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) {
this.messageIdExpression = messageIdExpression;
this.idempotentRepository = idempotentRepository;
this.eager = eager;
this.skipDuplicate = skipDuplicate;
+ this.removeOnFailure = removeOnFailure;
this.processor = AsyncProcessorConverterHelper.convert(processor);
}
@@ -98,7 +98,7 @@ public class IdempotentConsumer extends
}
// register our on completion callback
- exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager));
+ exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure));
// process the exchange
return processor.process(exchange, callback);
@@ -145,8 +145,8 @@ public class IdempotentConsumer extends
/**
* A strategy method to allow derived classes to overload the behaviour of
* processing a duplicate message
- *
- * @param exchange the exchange
+ *
+ * @param exchange the exchange
* @param messageId the message ID of this exchange
*/
protected void onDuplicateMessage(Exchange exchange, String messageId) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java?rev=1176825&r1=1176824&r2=1176825&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java Wed Sep 28 11:22:08 2011
@@ -28,19 +28,19 @@ import org.slf4j.LoggerFactory;
* <p/>
* This strategy adds the message id to the idempotent repository in cast the exchange
* was processed successfully. In case of failure the message id is <b>not</b> added.
- *
- * @version
*/
public class IdempotentOnCompletion implements Synchronization {
private static final transient Logger LOG = LoggerFactory.getLogger(IdempotentOnCompletion.class);
private final IdempotentRepository<String> idempotentRepository;
private final String messageId;
private final boolean eager;
+ private final boolean removeOnFailure;
- public IdempotentOnCompletion(IdempotentRepository<String> idempotentRepository, String messageId, boolean eager) {
+ public IdempotentOnCompletion(IdempotentRepository<String> idempotentRepository, String messageId, boolean eager, boolean removeOnFailure) {
this.idempotentRepository = idempotentRepository;
this.messageId = messageId;
this.eager = eager;
+ this.removeOnFailure = removeOnFailure;
}
public void onComplete(Exchange exchange) {
@@ -61,7 +61,7 @@ public class IdempotentOnCompletion impl
* A strategy method to allow derived classes to overload the behavior of
* processing a completed message
*
- * @param exchange the exchange
+ * @param exchange the exchange
* @param messageId the message ID of this exchange
*/
protected void onCompletedMessage(Exchange exchange, String messageId) {
@@ -76,12 +76,14 @@ public class IdempotentOnCompletion impl
* A strategy method to allow derived classes to overload the behavior of
* processing a failed message
*
- * @param exchange the exchange
+ * @param exchange the exchange
* @param messageId the message ID of this exchange
*/
protected void onFailedMessage(Exchange exchange, String messageId) {
- idempotentRepository.remove(messageId);
- LOG.debug("Removed from repository as exchange failed: {} with id: {}", exchange, messageId);
+ if (removeOnFailure) {
+ idempotentRepository.remove(messageId);
+ LOG.debug("Removed from repository as exchange failed: {} with id: {}", exchange, messageId);
+ }
}
@Override
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRemoveOnFailureTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRemoveOnFailureTest.java?rev=1176825&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRemoveOnFailureTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRemoveOnFailureTest.java Wed Sep 28 11:22:08 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+
+/**
+ * @version
+ */
+public class IdempotentConsumerRemoveOnFailureTest extends ContextTestSupport {
+ protected Endpoint startEndpoint;
+ protected MockEndpoint resultEndpoint;
+
+ public void testFailedExchangesNotRemoved() throws Exception {
+ resultEndpoint.expectedBodiesReceived("one", "three");
+
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("1", "one");
+ sendMessage("3", "three");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").idempotentConsumer(header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200))
+ // in case of a failure we still want the message to be regarded as a duplicate, so we set the option to false
+ .removeOnFailure(false)
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String id = exchange.getIn().getHeader("messageId", String.class);
+ if (id.equals("2")) {
+ throw new IllegalArgumentException("Damn I cannot handle id 2");
+ }
+ }
+ }).to("mock:result");
+ }
+ };
+ }
+
+ protected void sendMessage(final Object messageId, final Object body) {
+ template.send(startEndpoint, new Processor() {
+ public void process(Exchange exchange) {
+ // now lets fire in a message
+ Message in = exchange.getIn();
+ in.setBody(body);
+ in.setHeader("messageId", messageId);
+ }
+ });
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ startEndpoint = resolveMandatoryEndpoint("direct:start");
+ resultEndpoint = getMockEndpoint("mock:result");
+ }
+
+}
\ No newline at end of file
Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.java?rev=1176825&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.java Wed Sep 28 11:22:08 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.IdempotentConsumerRemoveOnFailureTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringIdempotentConsumerRemoveOnFailureTest extends IdempotentConsumerRemoveOnFailureTest {
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml");
+ }
+
+}
Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml?rev=1176825&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml (added)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml Wed Sep 28 11:22:08 2011
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+ <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
+ <bean id="exception" class="java.lang.IllegalArgumentException"/>
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="direct:start"/>
+ <!-- in case of a failure, we still want to regard any message as duplicates -->
+ <idempotentConsumer messageIdRepositoryRef="myRepo" removeOnFailure="false" >
+ <!-- use the messageId header as key for identifying duplicate messages -->
+ <header>messageId</header>
+ <choice>
+ <when>
+ <simple>${header.messageId} == 2</simple>
+ <throwException ref="exception"/>
+ </when>
+ <otherwise>
+ <to uri="mock:result"/>
+ </otherwise>
+ </choice>
+ </idempotentConsumer>
+ </route>
+ </camelContext>
+
+</beans>