You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2012/04/11 22:57:03 UTC

svn commit: r1325007 - /camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java

Author: cmueller
Date: Wed Apr 11 20:57:03 2012
New Revision: 1325007

URL: http://svn.apache.org/viewvc?rev=1325007&view=rev
Log:
CAMEL-3776: Add pooling support for JAXB data format

Modified:
    camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java

Modified: camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java?rev=1325007&r1=1325006&r2=1325007&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java Wed Apr 11 20:57:03 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.example;
 
+import java.io.ByteArrayInputStream;
 import java.io.StringWriter;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -25,6 +26,8 @@ import java.util.concurrent.TimeUnit;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Marshaller;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.converter.jaxb.JaxbDataFormat;
@@ -38,125 +41,96 @@ import org.junit.Test;
 public class DataFormatConcurrentTest extends CamelTestSupport {
 
     private int size = 2000;
-    
+    private int warmupCount = 100;
+    private int testCycleCount = 10000;
     private int fooBarSize = 50;
-    
-    public String createPayload() throws Exception {
-        Foo foo = new Foo();
-        for (int x = 0; x < fooBarSize; x++) {
-            Bar bar = new Bar();
-            bar.setName("Name: " + x);
-            bar.setValue("value: " + x);
-            foo.getBarRefs().add(bar);
-        }
-        Marshaller m = JAXBContext.newInstance(Foo.class, Bar.class).createMarshaller();
-        StringWriter writer = new StringWriter();
-        m.marshal(foo, writer);
-        return writer.toString();
-    }
 
     @Test
-    public void testUnmarshallConcurrent() throws Exception {
-        int counter = 10000;
-        //final String payload = "<purchaseOrder name='Wine' amount='123.45' price='2.22'/>";
-        final String payload = createPayload();
-        //System.out.println("Length: " + payload.length());
-        final CountDownLatch latch = new CountDownLatch(counter);
+    public void testUnmarshalConcurrent() throws Exception {
         template.setDefaultEndpointUri("direct:unmarshal");
+        final CountDownLatch latch = new CountDownLatch(warmupCount + testCycleCount);
 
-        ExecutorService pool = Executors.newFixedThreadPool(20);
-        //long start = System.currentTimeMillis();
-        for (int i = 0; i < counter; i++) {
-            pool.execute(new Runnable() {
-                public void run() {
-                    template.sendBody(payload);
-                    latch.countDown();
-                }
-            });
-        }
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:unmarshal")
+                        .unmarshal(new JaxbDataFormat("org.apache.camel.example"))
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                latch.countDown();
+                            }
+                        });
+            }
+        });
 
-        // should finish on fast machines in less than 3 seconds
-        assertTrue(latch.await(15, TimeUnit.SECONDS));
-        //long end = System.currentTimeMillis();
-        //System.out.println("took " + (end - start) + "ms");
+        unmarshal(latch);
     }
 
     @Test
-    public void testUnmarshallFallbackConcurrent() throws Exception {
-        int counter = 10000;
-        final String payload = "<purchaseOrder name='Wine' amount='123.45' price='2.22'/>";
-        final CountDownLatch latch = new CountDownLatch(counter);
+    public void testUnmarshalFallbackConcurrent() throws Exception {
         template.setDefaultEndpointUri("direct:unmarshalFallback");
+        final CountDownLatch latch = new CountDownLatch(warmupCount + testCycleCount);
 
-        ExecutorService pool = Executors.newFixedThreadPool(20);
-        //long start = System.currentTimeMillis();
-        for (int i = 0; i < counter; i++) {
-            pool.execute(new Runnable() {
-                public void run() {
-                    template.sendBody(payload);
-                    latch.countDown();
-                }
-            });
-        }
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:unmarshalFallback")
+                        .convertBodyTo(Foo.class)
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                latch.countDown();
+                            }
+                        });
+            }
+        });
 
-        // should finish on fast machines in less than 3 seconds
-        assertTrue(latch.await(10, TimeUnit.SECONDS));
-        //long end = System.currentTimeMillis();
-        //System.out.println("took " + (end - start) + "ms");
+        unmarshal(latch);
     }
 
     @Test
-    public void testMarshallFallbackConcurrent() throws Exception {
-        int counter = 10000;
-        final PurchaseOrder order = new PurchaseOrder();
-        order.setName("Wine");
-        order.setAmount(123.45);
-        order.setPrice(2.22);
-        final CountDownLatch latch = new CountDownLatch(counter);
-        template.setDefaultEndpointUri("direct:marshalFallback");
+    public void testMarshallConcurrent() throws Exception {
+        template.setDefaultEndpointUri("direct:marshal");
+        final CountDownLatch latch = new CountDownLatch(warmupCount + testCycleCount);
 
-        ExecutorService pool = Executors.newFixedThreadPool(20);
-        //long start = System.currentTimeMillis();
-        for (int i = 0; i < counter; i++) {
-            pool.execute(new Runnable() {
-                public void run() {
-                    template.sendBody(order);
-                    latch.countDown();
-                }
-            });
-        }
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:marshal")
+                        .marshal(new JaxbDataFormat("org.apache.camel.example"))
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                latch.countDown();
+                            }
+                        });
+            }
+        });
 
-        // should finish on fast machines in less than 3 seconds
-        assertTrue(latch.await(10, TimeUnit.SECONDS));
-        //long end = System.currentTimeMillis();
-        //System.out.println("took " + (end - start) + "ms");
+        marshal(latch);
     }
 
     @Test
-    public void testMarshallConcurrent() throws Exception {
-        int counter = 10000;
-        final PurchaseOrder order = new PurchaseOrder();
-        order.setName("Wine");
-        order.setAmount(123.45);
-        order.setPrice(2.22);
-        final CountDownLatch latch = new CountDownLatch(counter);
-        template.setDefaultEndpointUri("direct:marshal");
+    public void testMarshallFallbackConcurrent() throws Exception {
+        template.setDefaultEndpointUri("direct:marshalFallback");
+        final CountDownLatch latch = new CountDownLatch(warmupCount + testCycleCount);
 
-        ExecutorService pool = Executors.newFixedThreadPool(20);
-        //long start = System.currentTimeMillis();
-        for (int i = 0; i < counter; i++) {
-            pool.execute(new Runnable() {
-                public void run() {
-                    template.sendBody(order);
-                    latch.countDown();
-                }
-            });
-        }
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:marshalFallback")
+                        .convertBodyTo(String.class)
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                latch.countDown();
+                            }
+                        });
+            }
+        });
 
-        // should finish on fast machines in less than 3 seconds
-        assertTrue(latch.await(10, TimeUnit.SECONDS));
-        //long end = System.currentTimeMillis();
-        //System.out.println("took " + (end - start) + "ms");
+        marshal(latch);
     }
 
     @Test
@@ -192,31 +166,118 @@ public class DataFormatConcurrentTest ex
         assertMockEndpointsSatisfied();
     }
 
+    public void unmarshal(final CountDownLatch latch) throws Exception {
+        // warm up
+        ByteArrayInputStream[] warmUpPayloads = createPayloads(warmupCount);
+        for (ByteArrayInputStream payload : warmUpPayloads) {
+            template.sendBody(payload);
+        }
+
+        final ByteArrayInputStream[] payloads = createPayloads(testCycleCount);
+        ExecutorService pool = Executors.newFixedThreadPool(20);
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < payloads.length; i++) {
+            final int finalI = i;
+            pool.execute(new Runnable() {
+                public void run() {
+                    template.sendBody(payloads[finalI]);
+                }
+            });
+        }
+
+        latch.await();
+        long end = System.currentTimeMillis();
+        //System.out.println("sending " + payloads.length + " messages to " + template.getDefaultEndpoint().getEndpointUri() + " took " + (end - start) + "ms");
+    }
+
+    public void marshal(final CountDownLatch latch) throws Exception {
+        // warm up
+        Foo[] warmUpPayloads = createFoo(warmupCount);
+        for (Foo payload : warmUpPayloads) {
+            template.sendBody(payload);
+        }
+
+        final Foo[] payloads = createFoo(testCycleCount);
+        ExecutorService pool = Executors.newFixedThreadPool(20);
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < payloads.length; i++) {
+            final int finalI = i;
+            pool.execute(new Runnable() {
+                public void run() {
+                    template.sendBody(payloads[finalI]);
+                }
+            });
+        }
+
+        latch.await();
+        long end = System.currentTimeMillis();
+        //System.out.println("sending " + payloads.length + " messages to " + template.getDefaultEndpoint().getEndpointUri() + " took " + (end - start) + "ms");
+    }
+
+    /**
+     * the individual size of one record is:
+     * fooBarSize = 1  -> 104 bytes
+     * fooBarSize = 50 -> 2046 bytes
+     * @return the payloads used for this stress test
+     * @throws Exception
+     */
+    public Foo[] createFoo(int testCount) throws Exception {
+        Foo[] foos = new Foo[testCount];
+        for (int i = 0; i < testCount; i++) {
+            Foo foo = new Foo();
+            for (int x = 0; x < fooBarSize; x++) {
+                Bar bar = new Bar();
+                bar.setName("Name: " + x);
+                bar.setValue("value: " + x);
+                foo.getBarRefs().add(bar);
+            }
+
+            foos[i] = foo;
+        }
+
+        return foos;
+    }
+
+    /**
+     * the individual size of one record is:
+     * fooBarSize = 1  -> 104 bytes
+     * fooBarSize = 50 -> 2046 bytes
+     * @return the payloads used for this stress test
+     * @throws Exception
+     */
+    public ByteArrayInputStream[] createPayloads(int testCount) throws Exception {
+        Foo foo = new Foo();
+        for (int x = 0; x < fooBarSize; x++) {
+            Bar bar = new Bar();
+            bar.setName("Name: " + x);
+            bar.setValue("value: " + x);
+            foo.getBarRefs().add(bar);
+        }
+        Marshaller m = JAXBContext.newInstance(Foo.class, Bar.class).createMarshaller();
+        StringWriter writer = new StringWriter();
+        m.marshal(foo, writer);
+
+        byte[] payload = writer.toString().getBytes();
+        ByteArrayInputStream[] streams = new ByteArrayInputStream[testCount];
+        for (int i = 0; i < testCount; i++) {
+            streams[i] = new ByteArrayInputStream(payload);
+        }
+
+        return streams;
+    }
+
+    @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
                 DataFormat jaxb = new JaxbDataFormat("org.apache.camel.example");
 
                 // use seda that supports concurrent consumers for concurrency
-                from("seda:start?size=" + size + "&concurrentConsumers=5").marshal(jaxb).convertBodyTo(String.class).to("mock:result");
-
-                from("direct:unmarshal")
-                        .unmarshal(jaxb)
-                        .to("mock:result");
-
-                from("direct:marshal")
+                from("seda:start?size=" + size + "&concurrentConsumers=5")
                         .marshal(jaxb)
-                        .to("mock:result");
-
-                from("direct:unmarshalFallback")
-                        .convertBodyTo(PurchaseOrder.class)
-                        .to("mock:result");
-
-                from("direct:marshalFallback")
                         .convertBodyTo(String.class)
                         .to("mock:result");
             }
         };
     }
-
 }
\ No newline at end of file