You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/01 17:47:59 UTC

svn commit: r917587 - /camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java

Author: davsclaus
Date: Mon Mar  1 16:47:59 2010
New Revision: 917587

URL: http://svn.apache.org/viewvc?rev=917587&view=rev
Log:
CAMEL-217: Added load test to run manually

Added:
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java
      - copied, changed from r917549, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java

Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java (from r917549, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java&r1=917549&r2=917587&rev=917587&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java Mon Mar  1 16:47:59 2010
@@ -16,15 +16,25 @@
  */
 package org.apache.camel.component.hawtdb;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
-public class HawtDBAggregateTest extends CamelTestSupport {
+public class HawtDBAggregateLoadTest extends CamelTestSupport {
+
+    private static final char[] KEYS = new char[]{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J'};
+    private static final int SIZE = 5000;
 
+    @Before
     @Override
     public void setUp() throws Exception {
         deleteDirectory("target/data");
@@ -32,39 +42,49 @@
     }
 
     @Test
-    public void testHawtDBAggregate() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:aggregated");
-        mock.expectedBodiesReceived("ABCDE");
-
-        template.sendBodyAndHeader("direct:start", "A", "id", 123);
-        template.sendBodyAndHeader("direct:start", "B", "id", 123);
-        template.sendBodyAndHeader("direct:start", "C", "id", 123);
-        template.sendBodyAndHeader("direct:start", "D", "id", 123);
-        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+    @Ignore
+    public void testLoadTestHawtDBAggregate() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(10);
+        mock.setResultWaitTime(60 * 60 * 1000);
+
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+
+        System.out.println("Staring to send " + SIZE + " messages.");
+
+        for (int i = 0; i < SIZE; i++) {
+            final int value = 1;
+            final int key = i % 10;
+            executor.submit(new Callable<Object>() {
+                public Object call() throws Exception {
+                    char id = KEYS[key];
+                    template.sendBodyAndHeader("seda:start?size=" + SIZE, value, "id", "" + id);
+                    return null;
+                }
+            });
+        }
 
-        assertMockEndpointsSatisfied();
+        System.out.println("Sending all " + SIZE + " message done. Now waiting for aggregation to complete.");
 
-        // from endpoint should be preserved
-        assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri());
+        assertMockEndpointsSatisfied();
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
-            // START SNIPPET: e1
             public void configure() throws Exception {
-                // create the hawtdb repo
                 HawtDBAggregationRepository<String> repo = new HawtDBAggregationRepository<String>("repo1", "target/data/hawtdb.dat");
 
-                // here is the Camel route where we aggregate
-                from("direct:start")
+                from("seda:start?size=" + SIZE)
+                    .to("log:input?groupSize=500")
                     .aggregate(header("id"), new MyAggregationStrategy())
-                        // use our created hawtdb repo as aggregation repository
-                        .completionSize(5).aggregationRepository(repo)
-                        .to("mock:aggregated");
+                        .aggregationRepository(repo)
+                        .completionSize(SIZE / 10)
+                        .to("log:output?showHeaders=true")
+                        .to("mock:result")
+                    .end();
             }
-            // END SNIPPET: e1
         };
     }
 
@@ -74,10 +94,12 @@
             if (oldExchange == null) {
                 return newExchange;
             }
-            String body1 = oldExchange.getIn().getBody(String.class);
-            String body2 = newExchange.getIn().getBody(String.class);
 
-            oldExchange.getIn().setBody(body1 + body2);
+            Integer body1 = oldExchange.getIn().getBody(Integer.class);
+            Integer body2 = newExchange.getIn().getBody(Integer.class);
+            int sum = body1 + body2;
+
+            oldExchange.getIn().setBody(sum);
             return oldExchange;
         }
     }