You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2012/04/01 20:06:54 UTC
svn commit: r1308165 - in /camel/trunk/components/camel-jaxb/src:
main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java
test/java/org/apache/camel/example/DataFormatConcurrentTest.java
Author: cmueller
Date: Sun Apr 1 18:06:54 2012
New Revision: 1308165
URL: http://svn.apache.org/viewvc?rev=1308165&view=rev
Log:
CAMEL-3776: Add pooling support for JAXB data format
Modified:
camel/trunk/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java
camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
Modified: camel/trunk/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java?rev=1308165&r1=1308164&r2=1308165&view=diff
==============================================================================
--- camel/trunk/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java (original)
+++ camel/trunk/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java Sun Apr 1 18:06:54 2012
@@ -21,6 +21,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
+import java.util.concurrent.locks.ReentrantLock;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
@@ -66,6 +67,9 @@ public class JaxbDataFormat extends Serv
private String partClass;
private Class<Object> partialClass;
+ private Unmarshaller unmarshaller;
+ private ReentrantLock lock = new ReentrantLock();
+
public JaxbDataFormat() {
}
@@ -129,26 +133,29 @@ public class JaxbDataFormat extends Serv
public Object unmarshal(Exchange exchange, InputStream stream) throws IOException {
try {
- // must create a new instance of unmarshaller as its not thread safe
Object answer;
- Unmarshaller unmarshaller = getContext().createUnmarshaller();
- if (partialClass != null) {
- // partial unmarshalling
- Source source;
- if (needFiltering(exchange)) {
- source = new StreamSource(createNonXmlFilterReader(exchange, stream));
+ lock.lock();
+ try {
+ if (partialClass != null) {
+ // partial unmarshalling
+ Source source;
+ if (needFiltering(exchange)) {
+ source = new StreamSource(createNonXmlFilterReader(exchange, stream));
+ } else {
+ source = new StreamSource(stream);
+ }
+ answer = unmarshaller.unmarshal(source, partialClass);
} else {
- source = new StreamSource(stream);
- }
- answer = unmarshaller.unmarshal(source, partialClass);
- } else {
- if (needFiltering(exchange)) {
- NonXmlFilterReader reader = createNonXmlFilterReader(exchange, stream);
- answer = unmarshaller.unmarshal(reader);
- } else {
- answer = unmarshaller.unmarshal(stream);
+ if (needFiltering(exchange)) {
+ NonXmlFilterReader reader = createNonXmlFilterReader(exchange, stream);
+ answer = unmarshaller.unmarshal(reader);
+ } else {
+ answer = unmarshaller.unmarshal(stream);
+ }
}
+ } finally {
+ lock.unlock();
}
if (answer instanceof JAXBElement && isIgnoreJAXBElement()) {
@@ -262,6 +269,7 @@ public class JaxbDataFormat extends Serv
if (partClass != null) {
partialClass = camelContext.getClassResolver().resolveMandatoryClass(partClass, Object.class);
}
+ unmarshaller = getContext().createUnmarshaller();
}
@Override
Modified: camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java?rev=1308165&r1=1308164&r2=1308165&view=diff
==============================================================================
--- camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java (original)
+++ camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java Sun Apr 1 18:06:54 2012
@@ -16,8 +16,10 @@
*/
package org.apache.camel.example;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -27,13 +29,37 @@ import org.apache.camel.test.junit4.Came
import org.junit.Test;
/**
- * @version
+ * @version
*/
public class DataFormatConcurrentTest extends CamelTestSupport {
private int size = 2000;
@Test
+ public void testUnmarshallConcurrent() throws Exception {
+ int counter = 10000;
+ final String payload = "<purchaseOrder name='Wine' amount='123.45' price='2.22'/>";
+ final CountDownLatch latch = new CountDownLatch(counter);
+ template.setDefaultEndpointUri("direct:unmarshal");
+
+ ExecutorService pool = Executors.newFixedThreadPool(20);
+ //long start = System.currentTimeMillis();
+ for (int i = 0; i < counter; i++) {
+ pool.execute(new Runnable() {
+ public void run() {
+ template.sendBody(payload);
+ latch.countDown();
+ }
+ });
+ }
+
+ // should finish on fast machines in less than 3 seconds
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ //long end = System.currentTimeMillis();
+ //System.out.println("took " + (end - start) + "ms");
+ }
+
+ @Test
public void testSendConcurrent() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(size);
@@ -73,6 +99,10 @@ public class DataFormatConcurrentTest ex
// use seda that supports concurrent consumers for concurrency
from("seda:start?size=" + size + "&concurrentConsumers=5").marshal(jaxb).convertBodyTo(String.class).to("mock:result");
+
+ from("direct:unmarshal")
+ .unmarshal(jaxb)
+ .to("mock:result");
}
};
}