You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/10/02 09:57:39 UTC

svn commit: r1003751 - /camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java

Author: davsclaus
Date: Sat Oct  2 07:57:38 2010
New Revision: 1003751

URL: http://svn.apache.org/viewvc?rev=1003751&view=rev
Log:
CAMEL-3189: Added unit test.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java   (contents, props changed)
      - copied, changed from r1003532, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java (from r1003532, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java&r1=1003532&r2=1003751&rev=1003751&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java Sat Oct  2 07:57:38 2010
@@ -16,23 +16,38 @@
  */
 package org.apache.camel.processor.aggregator;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
 
 /**
+ * Testing CAMEL-3139
+ * 
  * @version $Revision$
  */
-public class AggregateSimpleSizeTest extends ContextTestSupport {
+public class AggregateNewExchangeAndConfirmTest extends ContextTestSupport {
 
-    public void testAggregateSimpleSize() throws Exception {
-        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
+    private MyRepo repo = new MyRepo();
+
+    public void testAggregateNewExchangeAndConfirm() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("ABC");
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);
 
         assertMockEndpointsSatisfied();
+
+        // give UoW time to complete and confirm
+        Thread.sleep(500);
+
+        // must have confirmed
+        assertEquals(mock.getReceivedExchanges().get(0).getExchangeId(), repo.getId());
     }
 
     @Override
@@ -40,16 +55,40 @@ public class AggregateSimpleSizeTest ext
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // START SNIPPET: e1
                 from("direct:start")
-                    // aggregate all exchanges correlated by the id header.
-                    // Aggregate them using the BodyInAggregatingStrategy strategy which
-                    // and after 3 messages has been aggregated then complete the aggregation
-                    // and send it to mock:aggregated
-                    .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(3)
-                        .to("mock:aggregated");
-                // END SNIPPET: e1
+                    .aggregate(header("id"), new MyNewExchangeAggregationStrategy())
+                        .aggregationRepository(repo)
+                        .completionSize(3)
+                    .to("mock:aggregated");
             }
         };
     }
+
+    private class MyNewExchangeAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            String body = "";
+            if (oldExchange != null) {
+                body = oldExchange.getIn().getBody(String.class);
+            }
+            body += newExchange.getIn().getBody(String.class);
+            newExchange.getIn().setBody(body);
+            return newExchange;
+        }
+    }
+
+    private class MyRepo extends MemoryAggregationRepository {
+
+        private String id;
+
+        @Override
+        public void confirm(CamelContext camelContext, String exchangeId) {
+            log.info("Confirmed id: " + exchangeId);
+            this.id = exchangeId;
+        }
+
+        public String getId() {
+            return id;
+        }
+    }
 }
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateNewExchangeAndConfirmTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date