You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2012/04/01 20:06:54 UTC

svn commit: r1308165 - in /camel/trunk/components/camel-jaxb/src: main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java test/java/org/apache/camel/example/DataFormatConcurrentTest.java

Author: cmueller
Date: Sun Apr  1 18:06:54 2012
New Revision: 1308165

URL: http://svn.apache.org/viewvc?rev=1308165&view=rev
Log:
CAMEL-3776: Add pooling support for JAXB data format

Modified:
    camel/trunk/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java
    camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java

Modified: camel/trunk/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java?rev=1308165&r1=1308164&r2=1308165&view=diff
==============================================================================
--- camel/trunk/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java (original)
+++ camel/trunk/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java Sun Apr  1 18:06:54 2012
@@ -21,6 +21,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBElement;
 import javax.xml.bind.JAXBException;
@@ -66,6 +67,9 @@ public class JaxbDataFormat extends Serv
     private String partClass;
     private Class<Object> partialClass;
 
+    private Unmarshaller unmarshaller;
+    private ReentrantLock lock = new ReentrantLock();
+
     public JaxbDataFormat() {
     }
 
@@ -129,26 +133,29 @@ public class JaxbDataFormat extends Serv
 
     public Object unmarshal(Exchange exchange, InputStream stream) throws IOException {
         try {
-            // must create a new instance of unmarshaller as its not thread safe
             Object answer;
-            Unmarshaller unmarshaller = getContext().createUnmarshaller();
 
-            if (partialClass != null) {
-                // partial unmarshalling
-                Source source;
-                if (needFiltering(exchange)) {
-                    source = new StreamSource(createNonXmlFilterReader(exchange, stream));
+            lock.lock();
+            try {
+                if (partialClass != null) {
+                    // partial unmarshalling
+                    Source source;
+                    if (needFiltering(exchange)) {
+                        source = new StreamSource(createNonXmlFilterReader(exchange, stream));
+                    } else {
+                        source = new StreamSource(stream);
+                    }
+                    answer = unmarshaller.unmarshal(source, partialClass);
                 } else {
-                    source = new StreamSource(stream);
-                }
-                answer = unmarshaller.unmarshal(source, partialClass);
-            } else {
-                if (needFiltering(exchange)) {
-                    NonXmlFilterReader reader = createNonXmlFilterReader(exchange, stream);
-                    answer = unmarshaller.unmarshal(reader);
-                } else  {
-                    answer = unmarshaller.unmarshal(stream);
+                    if (needFiltering(exchange)) {
+                        NonXmlFilterReader reader = createNonXmlFilterReader(exchange, stream);
+                        answer = unmarshaller.unmarshal(reader);
+                    } else  {
+                        answer = unmarshaller.unmarshal(stream);
+                    }
                 }
+            }  finally {
+                lock.unlock();
             }
 
             if (answer instanceof JAXBElement && isIgnoreJAXBElement()) {
@@ -262,6 +269,7 @@ public class JaxbDataFormat extends Serv
         if (partClass != null) {
             partialClass = camelContext.getClassResolver().resolveMandatoryClass(partClass, Object.class);
         }
+        unmarshaller = getContext().createUnmarshaller();
     }
 
     @Override

Modified: camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java?rev=1308165&r1=1308164&r2=1308165&view=diff
==============================================================================
--- camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java (original)
+++ camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java Sun Apr  1 18:06:54 2012
@@ -16,8 +16,10 @@
  */
 package org.apache.camel.example;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -27,13 +29,37 @@ import org.apache.camel.test.junit4.Came
 import org.junit.Test;
 
 /**
- * @version 
+ * @version
  */
 public class DataFormatConcurrentTest extends CamelTestSupport {
 
     private int size = 2000;
 
     @Test
+    public void testUnmarshallConcurrent() throws Exception {
+        int counter = 10000;
+        final String payload = "<purchaseOrder name='Wine' amount='123.45' price='2.22'/>";
+        final CountDownLatch latch = new CountDownLatch(counter);
+        template.setDefaultEndpointUri("direct:unmarshal");
+
+        ExecutorService pool = Executors.newFixedThreadPool(20);
+        //long start = System.currentTimeMillis();
+        for (int i = 0; i < counter; i++) {
+            pool.execute(new Runnable() {
+                public void run() {
+                    template.sendBody(payload);
+                    latch.countDown();
+                }
+            });
+        }
+
+        // should finish on fast machines in less than 3 seconds
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        //long end = System.currentTimeMillis();
+        //System.out.println("took " + (end - start) + "ms");
+    }
+
+    @Test
     public void testSendConcurrent() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(size);
@@ -73,6 +99,10 @@ public class DataFormatConcurrentTest ex
 
                 // use seda that supports concurrent consumers for concurrency
                 from("seda:start?size=" + size + "&concurrentConsumers=5").marshal(jaxb).convertBodyTo(String.class).to("mock:result");
+
+                from("direct:unmarshal")
+                        .unmarshal(jaxb)
+                        .to("mock:result");
             }
         };
     }