You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/09/28 13:22:09 UTC

svn commit: r1176825 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/idempotent/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-spring/src/test/java/org/apac...

Author: davsclaus
Date: Wed Sep 28 11:22:08 2011
New Revision: 1176825

URL: http://svn.apache.org/viewvc?rev=1176825&view=rev
Log:
CAMEL-4496: Added removeOnFailure option to idempotent consumer EIP. Thanks to Ioannis for the patch.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRemoveOnFailureTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java?rev=1176825&r1=1176824&r2=1176825&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java Wed Sep 28 11:22:08 2011
@@ -31,8 +31,6 @@ import org.apache.camel.util.ObjectHelpe
 
 /**
  * Represents an XML <idempotentConsumer/> element
- *
- * @version 
  */
 @XmlRootElement(name = "idempotentConsumer")
 @XmlAccessorType(XmlAccessType.FIELD)
@@ -43,6 +41,8 @@ public class IdempotentConsumerDefinitio
     private Boolean eager;
     @XmlAttribute
     private Boolean skipDuplicate;
+    @XmlAttribute
+    private Boolean removeOnFailure;
     @XmlTransient
     private IdempotentRepository<?> idempotentRepository;
 
@@ -58,7 +58,7 @@ public class IdempotentConsumerDefinitio
     public String toString() {
         return "IdempotentConsumer[" + getExpression() + " -> " + getOutputs() + "]";
     }
-    
+
     @Override
     public String getLabel() {
         return "idempotentConsumer[" + getExpression() + "]";
@@ -68,25 +68,25 @@ public class IdempotentConsumerDefinitio
     public String getShortName() {
         return "idempotentConsumer";
     }
-    
+
     // Fluent API
     //-------------------------------------------------------------------------
 
     /**
      * Sets the reference name of the message id repository
      *
-     * @param messageIdRepositoryRef  the reference name of message id repository
+     * @param messageIdRepositoryRef the reference name of message id repository
      * @return builder
      */
     public IdempotentConsumerDefinition messageIdRepositoryRef(String messageIdRepositoryRef) {
         setMessageIdRepositoryRef(messageIdRepositoryRef);
         return this;
     }
-    
+
     /**
      * Sets the the message id repository for the idempotent consumer
      *
-     * @param idempotentRepository  the repository instance of idempotent
+     * @param idempotentRepository the repository instance of idempotent
      * @return builder
      */
     public IdempotentConsumerDefinition messageIdRepository(IdempotentRepository<?> idempotentRepository) {
@@ -98,8 +98,8 @@ public class IdempotentConsumerDefinitio
      * Sets whether to eagerly add the key to the idempotent repository or wait until the exchange
      * is complete. Eager is default enabled.
      *
-     * @param eager  <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until
-     * the exchange is complete.
+     * @param eager <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until
+     *              the exchange is complete.
      * @return builder
      */
     public IdempotentConsumerDefinition eager(boolean eager) {
@@ -108,6 +108,20 @@ public class IdempotentConsumerDefinitio
     }
 
     /**
+     * Sets whether to remove or keep the key on failure.
+     * <p/>
+     * The default behavior is to remove the key on failure.
+     *
+     * @param removeOnFailure <tt>true</tt> to remove the key, <tt>false</tt> to keep the key
+     *                        if the exchange fails.
+     * @return builder
+     */
+    public IdempotentConsumerDefinition removeOnFailure(boolean removeOnFailure) {
+        setRemoveOnFailure(removeOnFailure);
+        return this;
+    }
+
+    /**
      * Sets whether to skip duplicates or not.
      * <p/>
      * The default behavior is to skip duplicates.
@@ -115,7 +129,7 @@ public class IdempotentConsumerDefinitio
      * A duplicate message would have the Exchange property {@link org.apache.camel.Exchange#DUPLICATE_MESSAGE} set
      * to a {@link Boolean#TRUE} value. A none duplicate message will not have this property set.
      *
-     * @param skipDuplicate  <tt>true</tt> to skip duplicates, <tt>false</tt> to allow duplicates.
+     * @param skipDuplicate <tt>true</tt> to skip duplicates, <tt>false</tt> to allow duplicates.
      * @return builder
      */
     public IdempotentConsumerDefinition skipDuplicate(boolean skipDuplicate) {
@@ -165,13 +179,27 @@ public class IdempotentConsumerDefinitio
         return skipDuplicate != null ? skipDuplicate : true;
     }
 
+    public Boolean getRemoveOnFailure() {
+        return removeOnFailure;
+    }
+
+    public void setRemoveOnFailure(Boolean removeOnFailure) {
+        this.removeOnFailure = removeOnFailure;
+    }
+
+    public boolean isRemoveOnFailure() {
+        // defaults to true if not configured
+        return removeOnFailure != null ? removeOnFailure : true;
+    }
+
+
     @Override
     @SuppressWarnings("unchecked")
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = this.createChildProcessor(routeContext, true);
 
         IdempotentRepository<String> idempotentRepository =
-            (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
+                (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
         ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
 
         // add as service to CamelContext so we can managed it and it ensures it will be shutdown when camel shutdowns
@@ -179,13 +207,13 @@ public class IdempotentConsumerDefinitio
 
         Expression expression = getExpression().createExpression(routeContext);
 
-        return new IdempotentConsumer(expression, idempotentRepository, isEager(), isSkipDuplicate(), childProcessor);
+        return new IdempotentConsumer(expression, idempotentRepository, isEager(), isSkipDuplicate(), isRemoveOnFailure(), childProcessor);
     }
 
     /**
      * Strategy method to resolve the {@link org.apache.camel.spi.IdempotentRepository} to use
      *
-     * @param routeContext  route context
+     * @param routeContext route context
      * @return the repository
      */
     protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=1176825&r1=1176824&r2=1176825&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Wed Sep 28 11:22:08 2011
@@ -36,8 +36,6 @@ import org.slf4j.LoggerFactory;
 /**
  * An implementation of the <a
  * href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern.
- * 
- * @version 
  */
 public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor, Navigate<Processor> {
     private static final transient Logger LOG = LoggerFactory.getLogger(IdempotentConsumer.class);
@@ -46,13 +44,15 @@ public class IdempotentConsumer extends 
     private final IdempotentRepository<String> idempotentRepository;
     private final boolean eager;
     private final boolean skipDuplicate;
+    private final boolean removeOnFailure;
 
     public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository,
-                              boolean eager, boolean skipDuplicate, Processor processor) {
+                              boolean eager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) {
         this.messageIdExpression = messageIdExpression;
         this.idempotentRepository = idempotentRepository;
         this.eager = eager;
         this.skipDuplicate = skipDuplicate;
+        this.removeOnFailure = removeOnFailure;
         this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
@@ -98,7 +98,7 @@ public class IdempotentConsumer extends 
         }
 
         // register our on completion callback
-        exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager));
+        exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure));
 
         // process the exchange
         return processor.process(exchange, callback);
@@ -145,8 +145,8 @@ public class IdempotentConsumer extends 
     /**
      * A strategy method to allow derived classes to overload the behaviour of
      * processing a duplicate message
-     * 
-     * @param exchange the exchange
+     *
+     * @param exchange  the exchange
      * @param messageId the message ID of this exchange
      */
     protected void onDuplicateMessage(Exchange exchange, String messageId) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java?rev=1176825&r1=1176824&r2=1176825&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java Wed Sep 28 11:22:08 2011
@@ -28,19 +28,19 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * This strategy adds the message id to the idempotent repository in cast the exchange
  * was processed successfully. In case of failure the message id is <b>not</b> added.
- *
- * @version 
  */
 public class IdempotentOnCompletion implements Synchronization {
     private static final transient Logger LOG = LoggerFactory.getLogger(IdempotentOnCompletion.class);
     private final IdempotentRepository<String> idempotentRepository;
     private final String messageId;
     private final boolean eager;
+    private final boolean removeOnFailure;
 
-    public IdempotentOnCompletion(IdempotentRepository<String> idempotentRepository, String messageId, boolean eager) {
+    public IdempotentOnCompletion(IdempotentRepository<String> idempotentRepository, String messageId, boolean eager, boolean removeOnFailure) {
         this.idempotentRepository = idempotentRepository;
         this.messageId = messageId;
         this.eager = eager;
+        this.removeOnFailure = removeOnFailure;
     }
 
     public void onComplete(Exchange exchange) {
@@ -61,7 +61,7 @@ public class IdempotentOnCompletion impl
      * A strategy method to allow derived classes to overload the behavior of
      * processing a completed message
      *
-     * @param exchange the exchange
+     * @param exchange  the exchange
      * @param messageId the message ID of this exchange
      */
     protected void onCompletedMessage(Exchange exchange, String messageId) {
@@ -76,12 +76,14 @@ public class IdempotentOnCompletion impl
      * A strategy method to allow derived classes to overload the behavior of
      * processing a failed message
      *
-     * @param exchange the exchange
+     * @param exchange  the exchange
      * @param messageId the message ID of this exchange
      */
     protected void onFailedMessage(Exchange exchange, String messageId) {
-        idempotentRepository.remove(messageId);
-        LOG.debug("Removed from repository as exchange failed: {} with id: {}", exchange, messageId);
+        if (removeOnFailure) {
+            idempotentRepository.remove(messageId);
+            LOG.debug("Removed from repository as exchange failed: {} with id: {}", exchange, messageId);
+        }
     }
 
     @Override

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRemoveOnFailureTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRemoveOnFailureTest.java?rev=1176825&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRemoveOnFailureTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRemoveOnFailureTest.java Wed Sep 28 11:22:08 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+
+/**
+ * @version
+ */
+public class IdempotentConsumerRemoveOnFailureTest extends ContextTestSupport {
+    protected Endpoint startEndpoint;
+    protected MockEndpoint resultEndpoint;
+
+    public void testFailedExchangesNotRemoved() throws Exception {
+        resultEndpoint.expectedBodiesReceived("one", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200))
+                        // in case of a failure we still want the message to be regarded as a duplicate, so we set the option to false
+                        .removeOnFailure(false)
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                String id = exchange.getIn().getHeader("messageId", String.class);
+                                if (id.equals("2")) {
+                                    throw new IllegalArgumentException("Damn I cannot handle id 2");
+                                }
+                            }
+                        }).to("mock:result");
+            }
+        };
+    }
+
+    protected void sendMessage(final Object messageId, final Object body) {
+        template.send(startEndpoint, new Processor() {
+            public void process(Exchange exchange) {
+                // now lets fire in a message
+                Message in = exchange.getIn();
+                in.setBody(body);
+                in.setHeader("messageId", messageId);
+            }
+        });
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        startEndpoint = resolveMandatoryEndpoint("direct:start");
+        resultEndpoint = getMockEndpoint("mock:result");
+    }
+
+}
\ No newline at end of file

Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.java?rev=1176825&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.java Wed Sep 28 11:22:08 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.IdempotentConsumerRemoveOnFailureTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringIdempotentConsumerRemoveOnFailureTest extends IdempotentConsumerRemoveOnFailureTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml");
+    }
+
+}

Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml?rev=1176825&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml (added)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerRemoveOnFailureTest.xml Wed Sep 28 11:22:08 2011
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+    <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
+    <bean id="exception" class="java.lang.IllegalArgumentException"/>
+
+    <camelContext xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:start"/>
+            <!-- in case of a failure, we still want to regard any message as duplicates -->
+            <idempotentConsumer messageIdRepositoryRef="myRepo" removeOnFailure="false" >
+                <!-- use the messageId header as key for identifying duplicate messages -->
+                <header>messageId</header>
+                <choice>
+                    <when>
+                        <simple>${header.messageId} == 2</simple>
+                        <throwException ref="exception"/>
+                    </when>
+                    <otherwise>
+                        <to uri="mock:result"/>
+                    </otherwise>
+                </choice>
+            </idempotentConsumer>
+        </route>
+    </camelContext>
+
+</beans>