You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/10/02 09:57:39 UTC
svn commit: r1003751 -
/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java
Author: davsclaus
Date: Sat Oct 2 07:57:38 2010
New Revision: 1003751
URL: http://svn.apache.org/viewvc?rev=1003751&view=rev
Log:
CAMEL-3189: Added unit test.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java (contents, props changed)
- copied, changed from r1003532, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java (from r1003532, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java&r1=1003532&r2=1003751&rev=1003751&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java Sat Oct 2 07:57:38 2010
@@ -16,23 +16,38 @@
*/
package org.apache.camel.processor.aggregator;
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
/**
+ * Testing CAMEL-3139
+ *
* @version $Revision$
*/
-public class AggregateSimpleSizeTest extends ContextTestSupport {
+public class AggregateNewExchangeAndConfirmTest extends ContextTestSupport {
- public void testAggregateSimpleSize() throws Exception {
- getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
+ private MyRepo repo = new MyRepo();
+
+ public void testAggregateNewExchangeAndConfirm() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("ABC");
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
template.sendBodyAndHeader("direct:start", "C", "id", 123);
assertMockEndpointsSatisfied();
+
+ // give UoW time to complete and confirm
+ Thread.sleep(500);
+
+ // must have confirmed
+ assertEquals(mock.getReceivedExchanges().get(0).getExchangeId(), repo.getId());
}
@Override
@@ -40,16 +55,40 @@ public class AggregateSimpleSizeTest ext
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // START SNIPPET: e1
from("direct:start")
- // aggregate all exchanges correlated by the id header.
- // Aggregate them using the BodyInAggregatingStrategy strategy which
- // and after 3 messages has been aggregated then complete the aggregation
- // and send it to mock:aggregated
- .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(3)
- .to("mock:aggregated");
- // END SNIPPET: e1
+ .aggregate(header("id"), new MyNewExchangeAggregationStrategy())
+ .aggregationRepository(repo)
+ .completionSize(3)
+ .to("mock:aggregated");
}
};
}
+
+ private class MyNewExchangeAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ String body = "";
+ if (oldExchange != null) {
+ body = oldExchange.getIn().getBody(String.class);
+ }
+ body += newExchange.getIn().getBody(String.class);
+ newExchange.getIn().setBody(body);
+ return newExchange;
+ }
+ }
+
+ private class MyRepo extends MemoryAggregationRepository {
+
+ private String id;
+
+ @Override
+ public void confirm(CamelContext camelContext, String exchangeId) {
+ log.info("Confirmed id: " + exchangeId);
+ this.id = exchangeId;
+ }
+
+ public String getId() {
+ return id;
+ }
+ }
}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date