You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/12/28 13:07:51 UTC

camel git commit: CAMEL-8188 Support to configure the scripte engine from CamelContext propertes

Repository: camel
Updated Branches:
  refs/heads/master 2e9fda50f -> 1c1f6e417


CAMEL-8188 Support to configure the scripte engine from CamelContext propertes


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c1f6e41
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c1f6e41
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c1f6e41

Branch: refs/heads/master
Commit: 1c1f6e41795a3408efc9bf8e7805a7b9499d27c8
Parents: 2e9fda5
Author: Willem Jiang <wi...@gmail.com>
Authored: Sun Dec 28 20:07:04 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Sun Dec 28 20:07:04 2014 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/camel/Exchange.java    |   3 +
 .../camel/builder/script/ScriptBuilder.java     |  69 +++++++--
 .../script/JRubyScriptThreadSafeTest.java       | 150 +++++++++++++++++++
 .../builder/script/JRubySingletonTest.java      |  23 +++
 4 files changed, 232 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1c1f6e41/camel-core/src/main/java/org/apache/camel/Exchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java
index 0d4165b..a9dbf18 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -179,6 +179,9 @@ public interface Exchange {
     String ROLLBACK_ONLY           = "CamelRollbackOnly";
     String ROLLBACK_ONLY_LAST      = "CamelRollbackOnlyLast";
     String ROUTE_STOP              = "CamelRouteStop";
+
+    String REUSE_SCRIPT_ENGINE = "CamelReuseScripteEngine";
+    String COMPILE_SCRIPT = "CamelCompileScript";
     
     String SAXPARSER_FACTORY   = "CamelSAXParserFactory";
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1c1f6e41/components/camel-script/src/main/java/org/apache/camel/builder/script/ScriptBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-script/src/main/java/org/apache/camel/builder/script/ScriptBuilder.java b/components/camel-script/src/main/java/org/apache/camel/builder/script/ScriptBuilder.java
index eda38b0..8c038ab 100644
--- a/components/camel-script/src/main/java/org/apache/camel/builder/script/ScriptBuilder.java
+++ b/components/camel-script/src/main/java/org/apache/camel/builder/script/ScriptBuilder.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
  * A builder class for creating {@link Processor}, {@link Expression} and
  * {@link Predicate} objects using the JSR 223 scripting engine.
  *
- * @version 
+ * @version
  */
 public class ScriptBuilder implements Expression, Predicate, Processor {
 
@@ -64,6 +64,7 @@ public class ScriptBuilder implements Expression, Predicate, Processor {
     private Map<String, Object> attributes;
     private final CamelContext camelContext;
     private final ScriptEngineFactory scriptEngineFactory;
+    private final ScriptEngine scriptEngine;
     private final String scriptLanguage;
     private final String scriptResource;
     private final String scriptText;
@@ -107,9 +108,11 @@ public class ScriptBuilder implements Expression, Predicate, Processor {
             this.scriptText = scriptText;
         }
         if (scriptEngineFactory == null) {
+            this.scriptEngine = createScriptEngine(scriptLanguage);
             this.scriptEngineFactory = lookupScriptEngineFactory(scriptLanguage);
         } else {
             this.scriptEngineFactory = scriptEngineFactory;
+            this.scriptEngine = scriptEngineFactory.getScriptEngine();
         }
 
         if (this.scriptEngineFactory == null) {
@@ -133,10 +136,9 @@ public class ScriptBuilder implements Expression, Predicate, Processor {
 
             // pre-compile script if we have it as text
             if (reader != null) {
-                ScriptEngine engine = this.scriptEngineFactory.getScriptEngine();
-                if (engine instanceof Compilable) {
-                    Compilable compilable = (Compilable) engine;
-                    this.compiledScript = compilable.compile(scriptText);
+                if (compileScripte(camelContext) && scriptEngine instanceof Compilable) {
+                    Compilable compilable = (Compilable) scriptEngine;
+                    this.compiledScript = compilable.compile(reader);
                     LOG.debug("Using compiled script: {}", this.compiledScript);
                 }
             }
@@ -373,17 +375,22 @@ public class ScriptBuilder implements Expression, Predicate, Processor {
 
     protected Object evaluateScript(Exchange exchange) {
         try {
-            // get a new engine which we must for each exchange
-            ScriptEngine engine = scriptEngineFactory.getScriptEngine();
-            ScriptContext context = populateBindings(engine, exchange, attributes);
-            addScriptEngineArguments(engine, exchange);
-            Object result = runScript(engine, exchange, context);
-            LOG.debug("The script evaluation result is: {}", result);
-            return result;
+            if (reuseScriptEngine(exchange)) {
+                // It's not safe to do the evaluation with a single scriptEngine
+                synchronized (this) {
+                    LOG.debug("Calling doEvaluateScript without creating a new scriptEngine");
+                    return doEvaluateScript(exchange, scriptEngine);
+                }
+            } else {
+                LOG.debug("Calling doEvaluateScript with a new scriptEngine");
+                // get a new engine which we must for each exchange
+                ScriptEngine engine = scriptEngineFactory.getScriptEngine();
+                return doEvaluateScript(exchange, engine);
+            }
         } catch (ScriptException e) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Script evaluation failed: " + e.getMessage(), e);
-            } 
+            }
             if (e.getCause() != null) {
                 throw createScriptEvaluationException(e.getCause());
             } else {
@@ -394,6 +401,42 @@ public class ScriptBuilder implements Expression, Predicate, Processor {
         }
     }
 
+    protected Object doEvaluateScript(Exchange exchange, ScriptEngine scriptEngine) throws ScriptException, IOException {
+        ScriptContext context = populateBindings(scriptEngine, exchange, attributes);
+        addScriptEngineArguments(scriptEngine, exchange);
+        Object result = runScript(scriptEngine, exchange, context);
+        LOG.debug("The script evaluation result is: {}", result);
+        return result;
+    }
+
+    // To check the camel context property to decide if we need to reuse the ScriptEngine
+    private boolean reuseScriptEngine(Exchange exchange) {
+        CamelContext camelContext = exchange.getContext();
+        if (camelContext != null) {
+            return getCamelContextProperty(camelContext, Exchange.REUSE_SCRIPT_ENGINE);
+        } else {
+            // default value is false
+            return false;
+        }
+    }
+
+    private boolean compileScripte(CamelContext camelContext) {
+        if (camelContext != null) {
+            return getCamelContextProperty(camelContext, Exchange.COMPILE_SCRIPT);
+        } else {
+            return false;
+        }
+    }
+
+    private boolean getCamelContextProperty(CamelContext camelContext, String propertyKey) {
+        String propertyValue =  camelContext.getProperty(propertyKey);
+        if (propertyValue != null) {
+            return camelContext.getTypeConverter().convertTo(boolean.class, propertyValue);
+        } else {
+            return false;
+        }
+    }
+
     protected Object runScript(ScriptEngine engine, Exchange exchange, ScriptContext context) throws ScriptException, IOException {
         Object result = null;
         if (compiledScript != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/1c1f6e41/components/camel-script/src/test/java/org/apache/camel/builder/script/JRubyScriptThreadSafeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-script/src/test/java/org/apache/camel/builder/script/JRubyScriptThreadSafeTest.java b/components/camel-script/src/test/java/org/apache/camel/builder/script/JRubyScriptThreadSafeTest.java
new file mode 100644
index 0000000..5dd68ff
--- /dev/null
+++ b/components/camel-script/src/test/java/org/apache/camel/builder/script/JRubyScriptThreadSafeTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder.script;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JRubyScriptThreadSafeTest extends CamelTestSupport {
+
+    @EndpointInject(uri = "mock:result")
+    MockEndpoint resultEndpoint;
+    @EndpointInject(uri = "mock:error")
+    MockEndpoint errorEndpoint;
+    
+    final int messageCount = 200;
+    final CountDownLatch latch = new CountDownLatch(messageCount);
+    long start;
+
+    @Before
+    public void setUp() throws Exception {
+        setUpEnv();
+        super.setUp();
+    }
+
+    protected void setUpEnv() {
+        System.setProperty("org.jruby.embed.localcontext.scope", "threadsafe");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                context.getProperties().put(Exchange.REUSE_SCRIPT_ENGINE, "true");
+                context.getProperties().put(Exchange.COMPILE_SCRIPT, "true");
+
+
+                from("seda:parallel?concurrentConsumers=5")
+                    .onException(Exception.class)
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                Throwable ex = exchange.getProperty("CamelExceptionCaught", Throwable.class);
+                                System.out.println(ex);
+                                ex.printStackTrace();
+                                latch.countDown();
+                            }
+                        })
+                        .to(errorEndpoint)
+                    .end()
+                    .to("language:ruby:result = $request.body?cacheScript=true")
+                    .to(resultEndpoint)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            latch.countDown();
+                        }
+                    });
+
+                from("seda:sequential?concurrentConsumers=1")
+                    .onException(Exception.class)
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                latch.countDown();
+                            }
+                        })
+                        .to(errorEndpoint)
+                    .end()
+                    .to("language:ruby:result = $request.body?cacheScript=true")
+                    .to(resultEndpoint)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            latch.countDown();
+                        }
+                    });
+            }
+        };
+    }
+
+    @Test
+    public void testParallelLocalContext() throws Exception {
+
+        resultEndpoint.setExpectedMessageCount(messageCount);
+        startStopWatch();
+        for (int i = 1; i < messageCount + 1; i++) {
+            template.sendBody("seda:parallel", "BODY" + i);
+        }
+        latch.await();
+        stopStopWatch();
+        assertMockEndpointsSatisfied(2, TimeUnit.SECONDS);
+        checkResult();
+        
+    }
+
+    @Test
+    public void testSequentialLocalContext() throws Exception {
+        resultEndpoint.setExpectedMessageCount(messageCount);
+        startStopWatch();
+        for (int i = 1; i < messageCount + 1; i++) {
+            template.sendBody("seda:sequential", "BODY" + i);
+        }
+        latch.await();
+        stopStopWatch();
+        assertMockEndpointsSatisfied(2, TimeUnit.SECONDS);
+        checkResult();
+        
+    }
+
+    private void checkResult() {
+        Set<String> bodies = new HashSet<String>();
+        for (Exchange exchange : resultEndpoint.getReceivedExchanges()) {
+            bodies.add(exchange.getIn().getBody(String.class));
+        }
+        Assert.assertEquals("duplicate bodies:", messageCount, bodies.size());
+    }
+
+    private void startStopWatch() {
+        start = System.currentTimeMillis();
+    }
+
+    private void stopStopWatch() {
+        System.out.println(this.getTestMethodName() + " processing time: " + (System.currentTimeMillis() - start) + "ms.");
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/1c1f6e41/components/camel-script/src/test/java/org/apache/camel/builder/script/JRubySingletonTest.java
----------------------------------------------------------------------
diff --git a/components/camel-script/src/test/java/org/apache/camel/builder/script/JRubySingletonTest.java b/components/camel-script/src/test/java/org/apache/camel/builder/script/JRubySingletonTest.java
new file mode 100644
index 0000000..687476d
--- /dev/null
+++ b/components/camel-script/src/test/java/org/apache/camel/builder/script/JRubySingletonTest.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder.script;
+
+public class JRubySingletonTest extends JRubyScriptThreadSafeTest {
+    protected void setUpEnv() {
+        System.setProperty("org.jruby.embed.localcontext.scope", "singleton");
+    }
+}