You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2012/04/06 17:03:27 UTC
svn commit: r1310380 -
/camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
Author: cmueller
Date: Fri Apr 6 15:03:26 2012
New Revision: 1310380
URL: http://svn.apache.org/viewvc?rev=1310380&view=rev
Log:
CAMEL-3776: Add pooling support for JAXB data format
Modified:
camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
Modified: camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java?rev=1310380&r1=1310379&r2=1310380&view=diff
==============================================================================
--- camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java (original)
+++ camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java Fri Apr 6 15:03:26 2012
@@ -16,6 +16,7 @@
*/
package org.apache.camel.example;
+import java.io.ByteArrayInputStream;
import java.io.StringWriter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -25,6 +26,8 @@ import java.util.concurrent.TimeUnit;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.converter.jaxb.JaxbDataFormat;
@@ -38,125 +41,96 @@ import org.junit.Test;
public class DataFormatConcurrentTest extends CamelTestSupport {
private int size = 2000;
-
+ private int warmupCount = 100;
+ private int testCycleCount = 10000;
private int fooBarSize = 50;
-
- public String createPayload() throws Exception {
- Foo foo = new Foo();
- for (int x = 0; x < fooBarSize; x++) {
- Bar bar = new Bar();
- bar.setName("Name: " + x);
- bar.setValue("value: " + x);
- foo.getBarRefs().add(bar);
- }
- Marshaller m = JAXBContext.newInstance(Foo.class, Bar.class).createMarshaller();
- StringWriter writer = new StringWriter();
- m.marshal(foo, writer);
- return writer.toString();
- }
@Test
- public void testUnmarshallConcurrent() throws Exception {
- int counter = 10000;
- //final String payload = "<purchaseOrder name='Wine' amount='123.45' price='2.22'/>";
- final String payload = createPayload();
- //System.out.println("Length: " + payload.length());
- final CountDownLatch latch = new CountDownLatch(counter);
+ public void testUnmarshalConcurrent() throws Exception {
template.setDefaultEndpointUri("direct:unmarshal");
+ final CountDownLatch latch = new CountDownLatch(warmupCount + testCycleCount);
- ExecutorService pool = Executors.newFixedThreadPool(20);
- //long start = System.currentTimeMillis();
- for (int i = 0; i < counter; i++) {
- pool.execute(new Runnable() {
- public void run() {
- template.sendBody(payload);
- latch.countDown();
- }
- });
- }
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:unmarshal")
+ .unmarshal(new JaxbDataFormat("org.apache.camel.example"))
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ latch.countDown();
+ }
+ });
+ }
+ });
- // should finish on fast machines in less than 3 seconds
- assertTrue(latch.await(15, TimeUnit.SECONDS));
- //long end = System.currentTimeMillis();
- //System.out.println("took " + (end - start) + "ms");
+ unmarshal(latch);
}
@Test
- public void testUnmarshallFallbackConcurrent() throws Exception {
- int counter = 10000;
- final String payload = "<purchaseOrder name='Wine' amount='123.45' price='2.22'/>";
- final CountDownLatch latch = new CountDownLatch(counter);
+ public void testUnmarshalFallbackConcurrent() throws Exception {
template.setDefaultEndpointUri("direct:unmarshalFallback");
+ final CountDownLatch latch = new CountDownLatch(warmupCount + testCycleCount);
- ExecutorService pool = Executors.newFixedThreadPool(20);
- //long start = System.currentTimeMillis();
- for (int i = 0; i < counter; i++) {
- pool.execute(new Runnable() {
- public void run() {
- template.sendBody(payload);
- latch.countDown();
- }
- });
- }
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:unmarshalFallback")
+ .convertBodyTo(Foo.class)
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ latch.countDown();
+ }
+ });
+ }
+ });
- // should finish on fast machines in less than 3 seconds
- assertTrue(latch.await(10, TimeUnit.SECONDS));
- //long end = System.currentTimeMillis();
- //System.out.println("took " + (end - start) + "ms");
+ unmarshal(latch);
}
@Test
- public void testMarshallFallbackConcurrent() throws Exception {
- int counter = 10000;
- final PurchaseOrder order = new PurchaseOrder();
- order.setName("Wine");
- order.setAmount(123.45);
- order.setPrice(2.22);
- final CountDownLatch latch = new CountDownLatch(counter);
- template.setDefaultEndpointUri("direct:marshalFallback");
+ public void testMarshallConcurrent() throws Exception {
+ template.setDefaultEndpointUri("direct:marshal");
+ final CountDownLatch latch = new CountDownLatch(warmupCount + testCycleCount);
- ExecutorService pool = Executors.newFixedThreadPool(20);
- //long start = System.currentTimeMillis();
- for (int i = 0; i < counter; i++) {
- pool.execute(new Runnable() {
- public void run() {
- template.sendBody(order);
- latch.countDown();
- }
- });
- }
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:marshal")
+ .marshal(new JaxbDataFormat("org.apache.camel.example"))
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ latch.countDown();
+ }
+ });
+ }
+ });
- // should finish on fast machines in less than 3 seconds
- assertTrue(latch.await(10, TimeUnit.SECONDS));
- //long end = System.currentTimeMillis();
- //System.out.println("took " + (end - start) + "ms");
+ marshal(latch);
}
@Test
- public void testMarshallConcurrent() throws Exception {
- int counter = 10000;
- final PurchaseOrder order = new PurchaseOrder();
- order.setName("Wine");
- order.setAmount(123.45);
- order.setPrice(2.22);
- final CountDownLatch latch = new CountDownLatch(counter);
- template.setDefaultEndpointUri("direct:marshal");
+ public void testMarshallFallbackConcurrent() throws Exception {
+ template.setDefaultEndpointUri("direct:marshalFallback");
+ final CountDownLatch latch = new CountDownLatch(warmupCount + testCycleCount);
- ExecutorService pool = Executors.newFixedThreadPool(20);
- //long start = System.currentTimeMillis();
- for (int i = 0; i < counter; i++) {
- pool.execute(new Runnable() {
- public void run() {
- template.sendBody(order);
- latch.countDown();
- }
- });
- }
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:marshalFallback")
+ .convertBodyTo(String.class)
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ latch.countDown();
+ }
+ });
+ }
+ });
- // should finish on fast machines in less than 3 seconds
- assertTrue(latch.await(10, TimeUnit.SECONDS));
- //long end = System.currentTimeMillis();
- //System.out.println("took " + (end - start) + "ms");
+ marshal(latch);
}
@Test
@@ -192,31 +166,118 @@ public class DataFormatConcurrentTest ex
assertMockEndpointsSatisfied();
}
+ public void unmarshal(final CountDownLatch latch) throws Exception {
+ // warm up
+ ByteArrayInputStream[] warmUpPayloads = createPayloads(warmupCount);
+ for (ByteArrayInputStream payload : warmUpPayloads) {
+ template.sendBody(payload);
+ }
+
+ final ByteArrayInputStream[] payloads = createPayloads(testCycleCount);
+ ExecutorService pool = Executors.newFixedThreadPool(20);
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < payloads.length; i++) {
+ final int finalI = i;
+ pool.execute(new Runnable() {
+ public void run() {
+ template.sendBody(payloads[finalI]);
+ }
+ });
+ }
+
+ latch.await();
+ long end = System.currentTimeMillis();
+ //System.out.println("sending " + payloads.length + " messages to " + template.getDefaultEndpoint().getEndpointUri() + " took " + (end - start) + "ms");
+ }
+
+ public void marshal(final CountDownLatch latch) throws Exception {
+ // warm up
+ Foo[] warmUpPayloads = createFoo(warmupCount);
+ for (Foo payload : warmUpPayloads) {
+ template.sendBody(payload);
+ }
+
+ final Foo[] payloads = createFoo(testCycleCount);
+ ExecutorService pool = Executors.newFixedThreadPool(20);
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < payloads.length; i++) {
+ final int finalI = i;
+ pool.execute(new Runnable() {
+ public void run() {
+ template.sendBody(payloads[finalI]);
+ }
+ });
+ }
+
+ latch.await();
+ long end = System.currentTimeMillis();
+ //System.out.println("sending " + payloads.length + " messages to " + template.getDefaultEndpoint().getEndpointUri() + " took " + (end - start) + "ms");
+ }
+
+ /**
+ * the individual size of one record is:
+ * fooBarSize = 1 -> 104 bytes
+ * fooBarSize = 50 -> 2046 bytes
+ * @return the payloads used for this stress test
+ * @throws Exception
+ */
+ public Foo[] createFoo(int testCount) throws Exception {
+ Foo[] foos = new Foo[testCount];
+ for (int i = 0; i < testCount; i++) {
+ Foo foo = new Foo();
+ for (int x = 0; x < fooBarSize; x++) {
+ Bar bar = new Bar();
+ bar.setName("Name: " + x);
+ bar.setValue("value: " + x);
+ foo.getBarRefs().add(bar);
+ }
+
+ foos[i] = foo;
+ }
+
+ return foos;
+ }
+
+ /**
+ * the individual size of one record is:
+ * fooBarSize = 1 -> 104 bytes
+ * fooBarSize = 50 -> 2046 bytes
+ * @return the payloads used for this stress test
+ * @throws Exception
+ */
+ public ByteArrayInputStream[] createPayloads(int testCount) throws Exception {
+ Foo foo = new Foo();
+ for (int x = 0; x < fooBarSize; x++) {
+ Bar bar = new Bar();
+ bar.setName("Name: " + x);
+ bar.setValue("value: " + x);
+ foo.getBarRefs().add(bar);
+ }
+ Marshaller m = JAXBContext.newInstance(Foo.class, Bar.class).createMarshaller();
+ StringWriter writer = new StringWriter();
+ m.marshal(foo, writer);
+
+ byte[] payload = writer.toString().getBytes();
+ ByteArrayInputStream[] streams = new ByteArrayInputStream[testCount];
+ for (int i = 0; i < testCount; i++) {
+ streams[i] = new ByteArrayInputStream(payload);
+ }
+
+ return streams;
+ }
+
+ @Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
DataFormat jaxb = new JaxbDataFormat("org.apache.camel.example");
// use seda that supports concurrent consumers for concurrency
- from("seda:start?size=" + size + "&concurrentConsumers=5").marshal(jaxb).convertBodyTo(String.class).to("mock:result");
-
- from("direct:unmarshal")
- .unmarshal(jaxb)
- .to("mock:result");
-
- from("direct:marshal")
+ from("seda:start?size=" + size + "&concurrentConsumers=5")
.marshal(jaxb)
- .to("mock:result");
-
- from("direct:unmarshalFallback")
- .convertBodyTo(PurchaseOrder.class)
- .to("mock:result");
-
- from("direct:marshalFallback")
.convertBodyTo(String.class)
.to("mock:result");
}
};
}
-
}
\ No newline at end of file