You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/02/24 07:25:34 UTC

[3/3] git commit: CAMEL-7239 Address the SchemaFactory thread safe issue.

CAMEL-7239 Address the SchemaFactory thread safe issue.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/06734f1d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/06734f1d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/06734f1d

Branch: refs/heads/camel-2.11.x
Commit: 06734f1d3ce1f780f3648e5f93719fa4516e9bd1
Parents: d2fb229
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Feb 24 12:46:32 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Feb 24 14:23:15 2014 +0800

----------------------------------------------------------------------
 .../validation/ValidatingProcessor.java         | 35 +++++++++++++++-----
 .../component/validator/ValidatorRouteTest.java | 30 +++++++++++++++++
 2 files changed, 57 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/06734f1d/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
index ee15fb4..243206b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.StringWriter;
 import java.net.URL;
 import java.util.Collections;
+
 import javax.xml.XMLConstants;
 import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.transform.Result;
@@ -40,6 +41,7 @@ import javax.xml.validation.Validator;
 
 import org.w3c.dom.Node;
 import org.w3c.dom.ls.LSResourceResolver;
+
 import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
@@ -53,6 +55,7 @@ import org.apache.camel.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * A processor which validates the XML version of the inbound message body
  * against some schema either in XSD or RelaxNG
@@ -61,9 +64,9 @@ public class ValidatingProcessor implements Processor {
     private static final Logger LOG = LoggerFactory.getLogger(ValidatingProcessor.class);
     private XmlConverter converter = new XmlConverter();
     private String schemaLanguage = XMLConstants.W3C_XML_SCHEMA_NS_URI;
-    private Schema schema;
+    private volatile Schema schema;
     private Source schemaSource;
-    private SchemaFactory schemaFactory;
+    private volatile SchemaFactory schemaFactory;
     private URL schemaUrl;
     private File schemaFile;
     private byte[] schemaAsByteArray;
@@ -175,7 +178,11 @@ public class ValidatingProcessor implements Processor {
 
     public Schema getSchema() throws IOException, SAXException {
         if (schema == null) {
-            schema = createSchema();
+            synchronized (this) {
+                if (schema == null) {
+                    schema = createSchema();
+                }
+            }
         }
         return schema;
     }
@@ -229,7 +236,11 @@ public class ValidatingProcessor implements Processor {
 
     public SchemaFactory getSchemaFactory() {
         if (schemaFactory == null) {
-            schemaFactory = createSchemaFactory();
+            synchronized (this) {
+                if (schemaFactory == null) {
+                    schemaFactory = createSchemaFactory();
+                }
+            }
         }
         return schemaFactory;
     }
@@ -321,21 +332,29 @@ public class ValidatingProcessor implements Processor {
 
         URL url = getSchemaUrl();
         if (url != null) {
-            return factory.newSchema(url);
+            synchronized (this) {
+                return factory.newSchema(url);
+            }
         }
 
         File file = getSchemaFile();
         if (file != null) {
-            return factory.newSchema(file);
+            synchronized (this) {
+                return factory.newSchema(file);
+            }
         }
 
         byte[] bytes = getSchemaAsByteArray();
         if (bytes != null) {
-            return factory.newSchema(new StreamSource(new ByteArrayInputStream(schemaAsByteArray)));
+            synchronized (this) {
+                return factory.newSchema(new StreamSource(new ByteArrayInputStream(schemaAsByteArray)));
+            }
         }
 
         Source source = getSchemaSource();
-        return factory.newSchema(source);
+        synchronized (this) {
+            return factory.newSchema(source);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/06734f1d/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorRouteTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorRouteTest.java b/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorRouteTest.java
index 036704f..aa78644 100644
--- a/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorRouteTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorRouteTest.java
@@ -16,6 +16,11 @@
  */
 package org.apache.camel.component.validator;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -129,6 +134,31 @@ public class ValidatorRouteTest extends ContextTestSupport {
 
         MockEndpoint.assertIsSatisfied(validEndpoint, invalidEndpoint, finallyEndpoint);
     }
+    
+    public void testConcurrentUseNotASharedSchema() throws Exception {
+        validEndpoint.expectedMessageCount(10);
+        // latch for the 10 exchanges we expect
+        final CountDownLatch latch = new CountDownLatch(10);
+        // setup a task executor to be able send the messages in parallel
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i = 0; i < 10; i++) {
+            executor.execute(new Runnable() {
+                public void run() {
+                    template.requestBody("direct:useNotASharedSchema", "<mail xmlns='http://foo.com/bar'><subject>Hey</subject><body>Hello world!</body></mail>");
+                    latch.countDown();
+                }
+            });
+        }
+
+        try {
+            // wait for test completion, timeout after 30 sec to let other unit test run to not wait forever
+            assertTrue(latch.await(30000L, TimeUnit.MILLISECONDS));
+            assertEquals("Latch should be zero", 0, latch.getCount());
+        } finally {
+            executor.shutdown();
+        }
+        MockEndpoint.assertIsSatisfied(validEndpoint, invalidEndpoint, finallyEndpoint);
+    }
 
     @Override
     protected void setUp() throws Exception {