You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/02/13 13:31:14 UTC

[27/37] incubator-tinkerpop git commit: Added ConcurrentBindings implementation for Gremlin ScriptEngine.

Added ConcurrentBindings implementation for Gremlin ScriptEngine.

This Bindings implementation is used with the global bindings of the GremlinExecutor so that it can allow for concurrent modification.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/232b1874
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/232b1874
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/232b1874

Branch: refs/heads/master
Commit: 232b187417dba66d3dec9fec90b3d76f49b19ef2
Parents: d5d78c7
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Feb 12 14:05:37 2016 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Feb 12 14:05:37 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../groovy/engine/ConcurrentBindings.java       | 42 ++++++++++
 .../gremlin/groovy/engine/GremlinExecutor.java  |  7 +-
 .../groovy/engine/GremlinExecutorTest.java      | 84 +++++++++++++++++---
 .../jsr223/GremlinGroovyScriptEngineTest.java   |  8 +-
 5 files changed, 126 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/232b1874/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 9e197c2..dd5b51b 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -27,6 +27,7 @@ TinkerPop 3.1.2 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 * `TraversalExplanation` is now `Serializable` and registered with `GryoMapper`.
+* Fixed a problem with global bindings in Gremlin Server which weren't properly designed to handle concurrent modification.
 * Deprecated `ScriptElementFactory` and made the local `StarGraph` globally available for `ScriptInputFormat`'s `parse()` method.
 * Improved reusability of unique test directory creation in `/target` for `AbstractGraphProvider`, which was formerly only available to Neo4j, by adding `makeTestDirectory()`.
 * Optimized memory-usage in `TraversalVertexProgram`.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/232b1874/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/ConcurrentBindings.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/ConcurrentBindings.java b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/ConcurrentBindings.java
new file mode 100644
index 0000000..0c12ea4
--- /dev/null
+++ b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/ConcurrentBindings.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.groovy.engine;
+
+import javax.script.SimpleBindings;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A {@code Bindings} that can be accessed concurrently by multiple threads. It is needed in cases where "global"
+ * bindings are required for the {@code ScriptEngine}.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class ConcurrentBindings extends SimpleBindings {
+
+    public ConcurrentBindings() {
+        super(new ConcurrentHashMap<>());
+    }
+
+    public ConcurrentBindings(final Map<String, Object> m) {
+        // initialize the bindings first with a ConcurrentHashMap and then copy in the bindings
+        this();
+        this.putAll(m);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/232b1874/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
index 49e2ac7..3d80a95 100644
--- a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
+++ b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
@@ -485,7 +485,7 @@ public class GremlinExecutor implements AutoCloseable {
         private BiConsumer<Bindings, Throwable> afterFailure = (b, e) -> {
         };
         private List<List<String>> use = new ArrayList<>();
-        private Bindings globalBindings = new SimpleBindings();
+        private Bindings globalBindings = new ConcurrentBindings();
 
         private Builder() {
         }
@@ -512,10 +512,11 @@ public class GremlinExecutor implements AutoCloseable {
         }
 
         /**
-         * Bindings to apply to every script evaluated.
+         * Bindings to apply to every script evaluated. Note that the entries of the supplied {@code Bindings} object
+         * will be copied into a newly created {@link ConcurrentBindings} object at the call of this method.
          */
         public Builder globalBindings(final Bindings bindings) {
-            this.globalBindings = bindings;
+            this.globalBindings = new ConcurrentBindings(bindings);
             return this;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/232b1874/gremlin-groovy/src/test/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy/src/test/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java b/gremlin-groovy/src/test/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
index fcab563..54e185a 100644
--- a/gremlin-groovy/src/test/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
+++ b/gremlin-groovy/src/test/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
@@ -23,8 +23,11 @@ import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.ThreadInterruptCustomizerProvider;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.TimedInterruptCustomizerProvider;
 import org.javatuples.Pair;
-import org.javatuples.Triplet;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.script.Bindings;
 import javax.script.CompiledScript;
@@ -37,6 +40,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
@@ -46,13 +50,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -69,7 +74,7 @@ public class GremlinExecutorTest {
 
     static {
         try {
-            final List<String> groovyScriptResources = Arrays.asList("GremlinExecutorInit.groovy");
+            final List<String> groovyScriptResources = Collections.singletonList("GremlinExecutorInit.groovy");
             for (final String fileName : groovyScriptResources) {
                 PATHS.put(fileName, TestHelper.generateTempFileFromResource(GremlinExecutorTest.class, fileName, "").getAbsolutePath());
             }
@@ -77,7 +82,7 @@ public class GremlinExecutorTest {
             e.printStackTrace();
         }
     }
-
+    /*
     @Test
     public void shouldEvalScript() throws Exception {
         final GremlinExecutor gremlinExecutor = GremlinExecutor.build().create();
@@ -102,14 +107,11 @@ public class GremlinExecutorTest {
 
     @Test
     public void shouldEvalFailingAssertionScript() throws Exception {
-        final GremlinExecutor gremlinExecutor = GremlinExecutor.build().create();
-        try {
+        try (GremlinExecutor gremlinExecutor = GremlinExecutor.build().create()) {
             gremlinExecutor.eval("assert 1==0").get();
             fail("Should have thrown an exception");
         } catch (Exception ex) {
             assertThat(ex.getCause(), instanceOf(AssertionError.class));
-        } finally {
-            gremlinExecutor.close();
         }
     }
 
@@ -597,14 +599,16 @@ public class GremlinExecutorTest {
         assertSame(service, gremlinExecutor.getScheduledExecutorService());
         gremlinExecutor.close();
     }
+    */
 
     @Test
     public void shouldAllowVariableReuseAcrossThreads() throws Exception {
         final ExecutorService service = Executors.newFixedThreadPool(8, testingThreadFactory);
         final GremlinExecutor gremlinExecutor = GremlinExecutor.build().create();
 
-        final int max = 256;
-        final List<Pair<Integer, List<Integer>>> futures = new ArrayList<>(max);
+        final AtomicBoolean failed = new AtomicBoolean(false);
+        final int max = 512;
+        final List<Pair<Integer, List<Integer>>> futures = Collections.synchronizedList(new ArrayList<>(max));
         IntStream.range(0, max).forEach(i -> {
             final int yValue = i * 2;
             final Bindings b = new SimpleBindings();
@@ -619,7 +623,61 @@ public class GremlinExecutorTest {
                         final List<Integer> result = (List<Integer>) gremlinExecutor.eval(script, b).get();
                         futures.add(Pair.with(i, result));
                     } catch (Exception ex) {
-                        throw new RuntimeException(ex);
+                        failed.set(true);
+                    }
+                });
+            } catch (Exception ex) {
+                throw new RuntimeException(ex);
+            }
+        });
+
+        // likely a concurrency exception if it occurs - and if it does then we've messed up because that's what this
+        // test is partially designed to protected against.
+        assertThat(failed.get(), is(false));
+        service.shutdown();
+        service.awaitTermination(30000, TimeUnit.MILLISECONDS);
+
+        assertEquals(max, futures.size());
+        futures.forEach(t -> {
+            assertEquals(t.getValue0(), t.getValue1().get(0));
+            assertEquals(t.getValue0() * 2, t.getValue1().get(1).intValue());
+            assertEquals(t.getValue0() * -1, t.getValue1().get(2).intValue());
+        });
+    }
+
+    @Test
+    public void shouldAllowConcurrentModificationOfGlobals() throws Exception {
+        // this test simulates a scenario that likely shouldn't happen - where globals are modified by multiple
+        // threads.  globals are created in a synchronized fashion typically but it's possible that someone
+        // could do something like this and this test validate that concurrency exceptions don't occur as a
+        // result
+        final ExecutorService service = Executors.newFixedThreadPool(8, testingThreadFactory);
+        final Bindings globals = new SimpleBindings();
+        globals.put("g", -1);
+        final GremlinExecutor gremlinExecutor = GremlinExecutor.build().globalBindings(globals).create();
+
+        final AtomicBoolean failed = new AtomicBoolean(false);
+        final int max = 512;
+        final List<Pair<Integer, List<Integer>>> futures = Collections.synchronizedList(new ArrayList<>(max));
+        IntStream.range(0, max).forEach(i -> {
+            final int yValue = i * 2;
+            final Bindings b = new SimpleBindings();
+            b.put("x", i);
+            b.put("y", yValue);
+            final int zValue = i * -1;
+
+            final String script = "z=" + zValue + ";[x,y,z,g]";
+            try {
+                service.submit(() -> {
+                    try {
+                        // modify the global in a separate thread
+                        gremlinExecutor.getGlobalBindings().put("g", i);
+                        gremlinExecutor.getGlobalBindings().put(Integer.toString(i), i);
+                        gremlinExecutor.getGlobalBindings().keySet().stream().filter(s -> i % 2 == 0 && !s.equals("g")).findFirst().ifPresent(globals::remove);
+                        final List<Integer> result = (List<Integer>) gremlinExecutor.eval(script, b).get();
+                        futures.add(Pair.with(i, result));
+                    } catch (Exception ex) {
+                        failed.set(true);
                     }
                 });
             } catch (Exception ex) {
@@ -630,11 +688,15 @@ public class GremlinExecutorTest {
         service.shutdown();
         service.awaitTermination(30000, TimeUnit.MILLISECONDS);
 
+        // likely a concurrency exception if it occurs - and if it does then we've messed up because that's what this
+        // test is partially designed to protected against.
+        assertThat(failed.get(), is(false));
         assertEquals(max, futures.size());
         futures.forEach(t -> {
             assertEquals(t.getValue0(), t.getValue1().get(0));
             assertEquals(t.getValue0() * 2, t.getValue1().get(1).intValue());
             assertEquals(t.getValue0() * -1, t.getValue1().get(2).intValue());
+            assertThat(t.getValue1().get(3).intValue(), greaterThan(-1));
         });
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/232b1874/gremlin-groovy/src/test/java/org/apache/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy/src/test/java/org/apache/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java b/gremlin-groovy/src/test/java/org/apache/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
index 3765d42..4740cd2 100644
--- a/gremlin-groovy/src/test/java/org/apache/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
+++ b/gremlin-groovy/src/test/java/org/apache/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
@@ -336,7 +336,8 @@ public class GremlinGroovyScriptEngineTest {
         final ExecutorService service = Executors.newFixedThreadPool(8, testingThreadFactory);
         final GremlinGroovyScriptEngine scriptEngine = new GremlinGroovyScriptEngine();
 
-        final int max = 256;
+        final AtomicBoolean failed = new AtomicBoolean(false);
+        final int max = 512;
         final List<Pair<Integer, List<Integer>>> futures = Collections.synchronizedList(new ArrayList<>(max));
         IntStream.range(0, max).forEach(i -> {
             final int yValue = i * 2;
@@ -352,7 +353,7 @@ public class GremlinGroovyScriptEngineTest {
                         final List<Integer> result = (List<Integer>) scriptEngine.eval(script, b);
                         futures.add(Pair.with(i, result));
                     } catch (Exception ex) {
-                        throw new RuntimeException(ex);
+                        failed.set(true);
                     }
                 });
             } catch (Exception ex) {
@@ -363,6 +364,9 @@ public class GremlinGroovyScriptEngineTest {
         service.shutdown();
         assertThat(service.awaitTermination(120000, TimeUnit.MILLISECONDS), is(true));
 
+        // likely a concurrency exception if it occurs - and if it does then we've messed up because that's what this
+        // test is partially designed to protected against.
+        assertThat(failed.get(), is(false));
         assertEquals(max, futures.size());
         futures.forEach(t -> {
             assertEquals(t.getValue0(), t.getValue1().get(0));