You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/01 11:07:45 UTC
svn commit: r959562 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java
Author: davsclaus
Date: Thu Jul 1 09:07:45 2010
New Revision: 959562
URL: http://svn.apache.org/viewvc?rev=959562&view=rev
Log:
CAMEL-2886: idempotent consumer supports async routing engine
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java
- copied, changed from r959549, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=959562&r1=959561&r2=959562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Thu Jul 1 09:07:45 2010
@@ -19,12 +19,16 @@ package org.apache.camel.processor.idemp
import java.util.ArrayList;
import java.util.List;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,21 +39,19 @@ import org.apache.commons.logging.LogFac
*
* @version $Revision$
*/
-public class IdempotentConsumer extends ServiceSupport implements Processor, Navigate<Processor> {
+public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor, Navigate<Processor> {
private static final transient Log LOG = LogFactory.getLog(IdempotentConsumer.class);
private final Expression messageIdExpression;
- private final Processor processor;
+ private final AsyncProcessor processor;
private final IdempotentRepository<String> idempotentRepository;
private final boolean eager;
- // TODO: should support async routing engine
-
- public IdempotentConsumer(Expression messageIdExpression,
- IdempotentRepository<String> idempotentRepository, boolean eager, Processor processor) {
+ public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository,
+ boolean eager, Processor processor) {
this.messageIdExpression = messageIdExpression;
this.idempotentRepository = idempotentRepository;
this.eager = eager;
- this.processor = processor;
+ this.processor = AsyncProcessorTypeConverter.convert(processor);
}
@Override
@@ -58,6 +60,10 @@ public class IdempotentConsumer extends
}
public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
final String messageId = messageIdExpression.evaluate(exchange, String.class);
if (messageId == null) {
throw new NoMessageIdException(exchange, messageIdExpression);
@@ -75,14 +81,15 @@ public class IdempotentConsumer extends
if (!newKey) {
// we already have this key so its a duplicate message
onDuplicateMessage(exchange, messageId);
- return;
+ callback.done(true);
+ return true;
}
// register our on completion callback
exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager));
// process the exchange
- processor.process(exchange);
+ return processor.process(exchange, callback);
}
public List<Processor> next() {
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java (from r959549, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=959549&r2=959562&rev=959562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointIdempotentConsumerTest.java Thu Jul 1 09:07:45 2010
@@ -20,22 +20,33 @@ import org.apache.camel.ContextTestSuppo
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointIdempotentConsumerTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
public void testAsyncEndpoint() throws Exception {
- getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
- getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
- getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:before").expectedBodiesReceived("A", "B", "C");
- String reply = template.requestBody("direct:start", "Hello Camel", String.class);
- assertEquals("Bye Camel", reply);
+ MockEndpoint after = getMockEndpoint("mock:after");
+ after.expectedBodiesReceived("Bye Camel", "Bye Camel");
+ after.message(0).header("myId").isEqualTo(123);
+ after.message(1).header("myId").isEqualTo(456);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("Bye Camel", "Bye Camel");
+ result.message(0).header("myId").isEqualTo(123);
+ result.message(1).header("myId").isEqualTo(456);
+
+ template.sendBodyAndHeader("direct:start", "A", "myId", 123);
+ template.sendBodyAndHeader("direct:start", "B", "myId", 123);
+ template.sendBodyAndHeader("direct:start", "C", "myId", 456);
assertMockEndpointsSatisfied();
@@ -57,6 +68,7 @@ public class AsyncEndpointTest extends C
beforeThreadName = Thread.currentThread().getName();
}
})
+ .idempotentConsumer(header("myId"), MemoryIdempotentRepository.memoryIdempotentRepository(200))
.to("async:Bye Camel")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
@@ -70,4 +82,4 @@ public class AsyncEndpointTest extends C
};
}
-}
+}
\ No newline at end of file