You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/01 11:07:45 UTC

svn commit: r959562 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java

Author: davsclaus
Date: Thu Jul  1 09:07:45 2010
New Revision: 959562

URL: http://svn.apache.org/viewvc?rev=959562&view=rev
Log:
CAMEL-2886: idempotent consumer supports async routing engine

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java
      - copied, changed from r959549, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=959562&r1=959561&r2=959562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Thu Jul  1 09:07:45 2010
@@ -19,12 +19,16 @@ package org.apache.camel.processor.idemp
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,21 +39,19 @@ import org.apache.commons.logging.LogFac
  * 
  * @version $Revision$
  */
-public class IdempotentConsumer extends ServiceSupport implements Processor, Navigate<Processor> {
+public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor, Navigate<Processor> {
     private static final transient Log LOG = LogFactory.getLog(IdempotentConsumer.class);
     private final Expression messageIdExpression;
-    private final Processor processor;
+    private final AsyncProcessor processor;
     private final IdempotentRepository<String> idempotentRepository;
     private final boolean eager;
 
-    // TODO: should support async routing engine
-
-    public IdempotentConsumer(Expression messageIdExpression, 
-            IdempotentRepository<String> idempotentRepository, boolean eager, Processor processor) {
+    public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository,
+                              boolean eager, Processor processor) {
         this.messageIdExpression = messageIdExpression;
         this.idempotentRepository = idempotentRepository;
         this.eager = eager;
-        this.processor = processor;
+        this.processor = AsyncProcessorTypeConverter.convert(processor);
     }
 
     @Override
@@ -58,6 +60,10 @@ public class IdempotentConsumer extends 
     }
 
     public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         final String messageId = messageIdExpression.evaluate(exchange, String.class);
         if (messageId == null) {
             throw new NoMessageIdException(exchange, messageIdExpression);
@@ -75,14 +81,15 @@ public class IdempotentConsumer extends 
         if (!newKey) {
             // we already have this key so its a duplicate message
             onDuplicateMessage(exchange, messageId);
-            return;
+            callback.done(true);
+            return true;
         }
 
         // register our on completion callback
         exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager));
 
         // process the exchange
-        processor.process(exchange);
+        return processor.process(exchange, callback);
     }
 
     public List<Processor> next() {

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java (from r959549, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=959549&r2=959562&rev=959562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java Thu Jul  1 09:07:45 2010
@@ -20,22 +20,33 @@ import org.apache.camel.ContextTestSuppo
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
 
 /**
  * @version $Revision$
  */
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointIdempotentConsumerTest extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
 
     public void testAsyncEndpoint() throws Exception {
-        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
-        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
-        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:before").expectedBodiesReceived("A", "B", "C");
 
-        String reply = template.requestBody("direct:start", "Hello Camel", String.class);
-        assertEquals("Bye Camel", reply);
+        MockEndpoint after = getMockEndpoint("mock:after");
+        after.expectedBodiesReceived("Bye Camel", "Bye Camel");
+        after.message(0).header("myId").isEqualTo(123);
+        after.message(1).header("myId").isEqualTo(456);
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedBodiesReceived("Bye Camel", "Bye Camel");
+        result.message(0).header("myId").isEqualTo(123);
+        result.message(1).header("myId").isEqualTo(456);
+
+        template.sendBodyAndHeader("direct:start", "A", "myId", 123);
+        template.sendBodyAndHeader("direct:start", "B", "myId", 123);
+        template.sendBodyAndHeader("direct:start", "C", "myId", 456);
 
         assertMockEndpointsSatisfied();
 
@@ -57,6 +68,7 @@ public class AsyncEndpointTest extends C
                                 beforeThreadName = Thread.currentThread().getName();
                             }
                         })
+                        .idempotentConsumer(header("myId"), MemoryIdempotentRepository.memoryIdempotentRepository(200))
                         .to("async:Bye Camel")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
@@ -70,4 +82,4 @@ public class AsyncEndpointTest extends C
         };
     }
 
-}
+}
\ No newline at end of file