You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/11 21:41:21 UTC

incubator-tinkerpop git commit: Thread pools are properly shutdown if internal ones are used. #514

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 1734d7a68 -> 4050ded7a


Thread pools are properly shutdown if internal ones are used. #514

Also did a fair amount of thread naming in tests to help with profiling.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4050ded7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4050ded7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4050ded7

Branch: refs/heads/master
Commit: 4050ded7aa3e015c921acd9b40da4fb91502a20a
Parents: 1734d7a
Author: Stephen Mallette <sp...@apache.org>
Authored: Wed Feb 11 15:40:10 2015 -0500
Committer: Stephen Mallette <sp...@apache.org>
Committed: Wed Feb 11 15:40:10 2015 -0500

----------------------------------------------------------------------
 .../groovy/engine/GremlinExecutorTest.java      | 62 ++++++++++++++++---
 .../jsr223/GremlinGroovyScriptEngineTest.java   |  8 +--
 .../gremlin/groovy/engine/GremlinExecutor.java  | 65 +++++++++++++++++---
 3 files changed, 117 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4050ded7/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java b/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
index 1f26ba4..757b98e 100644
--- a/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
+++ b/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
@@ -23,6 +23,7 @@ import com.tinkerpop.gremlin.LoadGraphWith;
 import com.tinkerpop.gremlin.TestHelper;
 import com.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngineTest;
 import com.tinkerpop.gremlin.structure.Graph;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.junit.Test;
 import org.kohsuke.groovy.sandbox.GroovyInterceptor;
 
@@ -54,6 +55,7 @@ import static org.junit.Assert.*;
  */
 public class GremlinExecutorTest extends AbstractGremlinTest {
     public static Map<String, String> PATHS = new HashMap<>();
+    private final BasicThreadFactory testingThreadFactory = new BasicThreadFactory.Builder().namingPattern("test-gremlin-executor-%d").build();
 
     static {
         try {
@@ -70,6 +72,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
     public void shouldEvalScript() throws Exception {
         final GremlinExecutor gremlinExecutor = GremlinExecutor.build().create();
         assertEquals(2, gremlinExecutor.eval("1+1").get());
+        gremlinExecutor.close();
     }
 
     @Test
@@ -81,6 +84,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         assertEquals(5, gremlinExecutor.eval("1+4").get());
         assertEquals(6, gremlinExecutor.eval("1+5").get());
         assertEquals(7, gremlinExecutor.eval("1+6").get());
+        gremlinExecutor.close();
     }
 
     @Test
@@ -89,6 +93,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         final Bindings b = new SimpleBindings();
         b.put("x", 1);
         assertEquals(2, gremlinExecutor.eval("1+x", b).get());
+        gremlinExecutor.close();
     }
 
     @Test
@@ -97,6 +102,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         b.put("x", 1);
         final GremlinExecutor gremlinExecutor = GremlinExecutor.build().globalBindings(b).create();
         assertEquals(2, gremlinExecutor.eval("1+x").get());
+        gremlinExecutor.close();
     }
 
     @Test
@@ -107,6 +113,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         final Bindings b = new SimpleBindings();
         b.put("y", 1);
         assertEquals(2, gremlinExecutor.eval("y+x", b).get());
+        gremlinExecutor.close();
     }
 
     @Test
@@ -117,6 +124,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         final Bindings b = new SimpleBindings();
         b.put("x", 10);
         assertEquals(11, gremlinExecutor.eval("x+1", b).get());
+        gremlinExecutor.close();
     }
 
     @Test
@@ -143,6 +151,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         assertFalse(successCalled.get());
         assertFalse(failureCalled.get());
         assertEquals(0, timeOutCount.getCount());
+        gremlinExecutor.close();
     }
 
     @Test
@@ -167,6 +176,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         assertFalse(timeoutCalled.get());
         assertFalse(successCalled.get());
         assertTrue(failureCalled.get());
+        gremlinExecutor.close();
     }
 
     @Test
@@ -186,16 +196,17 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         assertFalse(timeoutCalled.get());
         assertTrue(successCalled.get());
         assertFalse(failureCalled.get());
+        gremlinExecutor.close();
     }
 
     @Test
     @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
-    public void shouldAllowTraversalToIterateInDifferentThreadThanOriginallyEvaluatedWithAutoCommit() {
+    public void shouldAllowTraversalToIterateInDifferentThreadThanOriginallyEvaluatedWithAutoCommit() throws Exception {
         // this test sort of simulates Gremlin Server interaction where a Traversal is eval'd in one Thread, but
         // then iterated in another.  note that Gremlin Server configures the script engine to auto-commit
         // after evaluation.  this basically tests the state of the Gremlin Server GremlinExecutor when
         // being used in sessionless mode
-        final ExecutorService evalExecutor = Executors.newSingleThreadExecutor();
+        final ExecutorService evalExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
         final GremlinExecutor gremlinExecutor = GremlinExecutor.build()
                 .afterSuccess(b -> {
                     final Graph graph = (Graph) b.get("g");
@@ -209,22 +220,28 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
 
         final AtomicInteger vertexCount = new AtomicInteger(0);
 
-        final ExecutorService iterationExecutor = Executors.newSingleThreadExecutor();
+        final ExecutorService iterationExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
         gremlinExecutor.eval("g.V().out()", bindings).thenAcceptAsync(o -> {
             final Iterator itty = (Iterator) o;
             itty.forEachRemaining(v -> vertexCount.incrementAndGet());
         }, iterationExecutor).join();
 
         assertEquals(6, vertexCount.get());
+
+        gremlinExecutor.close();
+        evalExecutor.shutdown();
+        evalExecutor.awaitTermination(30000, TimeUnit.MILLISECONDS);
+        iterationExecutor.shutdown();
+        iterationExecutor.awaitTermination(30000, TimeUnit.MILLISECONDS);
     }
 
     @Test
     @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
-    public void shouldAllowTraversalToIterateInDifferentThreadThanOriginallyEvaluatedWithoutAutoCommit() {
+    public void shouldAllowTraversalToIterateInDifferentThreadThanOriginallyEvaluatedWithoutAutoCommit() throws Exception {
         // this test sort of simulates Gremlin Server interaction where a Traversal is eval'd in one Thread, but
         // then iterated in another.  this basically tests the state of the Gremlin Server GremlinExecutor when
         // being used in session mode
-        final ExecutorService evalExecutor = Executors.newSingleThreadExecutor();
+        final ExecutorService evalExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
         final GremlinExecutor gremlinExecutor = GremlinExecutor.build().executorService(evalExecutor).create();
 
         final Map<String,Object> bindings = new HashMap<>();
@@ -232,13 +249,19 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
 
         final AtomicInteger vertexCount = new AtomicInteger(0);
 
-        final ExecutorService iterationExecutor = Executors.newSingleThreadExecutor();
+        final ExecutorService iterationExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
         gremlinExecutor.eval("g.V().out()", bindings).thenAcceptAsync(o -> {
             final Iterator itty = (Iterator) o;
             itty.forEachRemaining(v -> vertexCount.incrementAndGet());
         }, iterationExecutor).join();
 
         assertEquals(6, vertexCount.get());
+
+        gremlinExecutor.close();
+        evalExecutor.shutdown();
+        evalExecutor.awaitTermination(30000, TimeUnit.MILLISECONDS);
+        iterationExecutor.shutdown();
+        iterationExecutor.awaitTermination(30000, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -278,13 +301,16 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         assertEquals(2, i2.get());
         assertFalse(b1.get());
         assertFalse(b2.get());
+
+        gremlinExecutor.close();
+
     }
 
     @Test
     public void shouldNotExhaustThreads() throws Exception {
         // this is not representative of how the GremlinExecutor should be configured.  A single thread executor
         // shared will create odd behaviors, but it's good for this test.
-        final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+        final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(testingThreadFactory);
         final GremlinExecutor gremlinExecutor = GremlinExecutor.build()
                 .executorService(executorService)
                 .scheduledExecutorService(executorService).create();
@@ -301,6 +327,9 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         }));
 
         assertEquals(1000, count.intValue());
+
+        executorService.shutdown();
+        executorService.awaitTermination(30000, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -341,6 +370,8 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
 
         assertTrue(successes.intValue() > 0);
         assertTrue(failures.intValue() >= 500);
+
+        gremlinExecutor.close();
     }
 
     @Test
@@ -354,6 +385,8 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
                 .create();
 
         assertEquals(2, gremlinExecutor.eval("add(1,1)").get());
+
+        gremlinExecutor.close();
     }
 
     @Test
@@ -422,5 +455,20 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         gremlinExecutor.getScriptEngines().reset();
 
         assertEquals(2, gremlinExecutor.eval("add(1,1)").get());
+
+        gremlinExecutor.close();
+    }
+
+    @Test
+    public void shouldNotShutdownExecutorServicesSuppliedToGremlinExecutor() throws Exception {
+        final ScheduledExecutorService service = Executors.newScheduledThreadPool(4, testingThreadFactory);
+        final GremlinExecutor gremlinExecutor = GremlinExecutor.build()
+                .executorService(service)
+                .scheduledExecutorService(service).create();
+
+        gremlinExecutor.close();
+        assertFalse(service.isShutdown());
+        service.shutdown();
+        service.awaitTermination(30000, TimeUnit.MILLISECONDS);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4050ded7/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java b/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
index 1f7dbe4..324eea0 100644
--- a/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
+++ b/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
@@ -230,7 +230,7 @@ public class GremlinGroovyScriptEngineTest extends AbstractGremlinTest {
         final Random random = new Random();
 
         for (int i = 0; i < runs; i++) {
-            new Thread() {
+            new Thread("test-thread-safe-" + i) {
                 public void run() {
                     String name = names.get(random.nextInt(names.size() - 1));
                     try {
@@ -264,7 +264,7 @@ public class GremlinGroovyScriptEngineTest extends AbstractGremlinTest {
         final Random random = new Random();
 
         for (int i = 0; i < runs; i++) {
-            new Thread() {
+            new Thread("test-thread-safety-on-compiled-script-" + i) {
                 public void run() {
                     String name = names.get(random.nextInt(names.size() - 1));
                     try {
@@ -459,7 +459,7 @@ public class GremlinGroovyScriptEngineTest extends AbstractGremlinTest {
                 se.printStackTrace();
                 fail.set(true);
             }
-        });
+        }, "test-reload-classloader-1");
 
         t.start();
 
@@ -473,7 +473,7 @@ public class GremlinGroovyScriptEngineTest extends AbstractGremlinTest {
             }};
             scriptEngine.addImports(imports);
             latch.countDown();
-        }).start();
+        }, "test-reload-classloader-2").start();
 
         t.join();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4050ded7/gremlin-groovy/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java b/gremlin-groovy/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
index f1184ad..51c668b 100644
--- a/gremlin-groovy/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
+++ b/gremlin-groovy/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
@@ -52,7 +52,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
 /**
@@ -88,13 +87,15 @@ public class GremlinExecutor implements AutoCloseable {
     private final Consumer<Bindings> afterTimeout;
     private final BiConsumer<Bindings, Exception> afterFailure;
     private final Set<String> enabledPlugins;
+    private final boolean suppliedExecutor;
+    private final boolean suppliedScheduledExecutor;
 
     private GremlinExecutor(final Map<String, EngineSettings> settings, final List<List<String>> use,
                             final long scriptEvaluationTimeout, final Bindings globalBindings,
                             final ExecutorService executorService, final ScheduledExecutorService scheduledExecutorService,
                             final Consumer<Bindings> beforeEval, final Consumer<Bindings> afterSuccess,
                             final Consumer<Bindings> afterTimeout, final BiConsumer<Bindings, Exception> afterFailure,
-                            final Set<String> enabledPlugins) {
+                            final Set<String> enabledPlugins, final boolean suppliedExecutor, final boolean suppliedScheduledExecutor) {
         this.executorService = executorService;
         this.scheduledExecutorService = scheduledExecutorService;
         this.beforeEval = beforeEval;
@@ -107,6 +108,8 @@ public class GremlinExecutor implements AutoCloseable {
         this.globalBindings = globalBindings;
         this.enabledPlugins = enabledPlugins;
         this.scriptEngines = createScriptEngines();
+        this.suppliedExecutor = suppliedExecutor;
+        this.suppliedScheduledExecutor = suppliedScheduledExecutor;
     }
 
     public CompletableFuture<Object> eval(final String script) {
@@ -193,17 +196,59 @@ public class GremlinExecutor implements AutoCloseable {
     /**
      * {@inheritDoc}
      * <p/>
-     * Note that the executors are not closed by virtue of this operation.  Manage them manually.
+     * Executors are only closed if they were not supplied externally in the
+     * {@link com.tinkerpop.gremlin.groovy.engine.GremlinExecutor.Builder}
      */
     @Override
     public void close() throws Exception {
-        // leave pools running as they are supplied externally.  let the sender be responsible for shutting them down
-        scriptEngines.close();
+        closeAsync().join();
+    }
+
+    /**
+     * Executors are only closed if they were not supplied externally in the
+     * {@link com.tinkerpop.gremlin.groovy.engine.GremlinExecutor.Builder}
+     */
+    public CompletableFuture<Void> closeAsync() throws Exception {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+
+        new Thread(() -> {
+            // leave pools running if they are supplied externally.  let the sender be responsible for shutting them down
+            if (!suppliedExecutor) {
+                executorService.shutdown();
+                try {
+                    if (!executorService.awaitTermination(180000, TimeUnit.MILLISECONDS))
+                        logger.warn("Timeout while waiting for ExecutorService of GremlinExecutor to shutdown.");
+                } catch (InterruptedException ie) {
+                    logger.warn("ExecutorService on GremlinExecutor may not have shutdown properly as shutdown thread terminated early.");
+                }
+            }
+
+            // calls to shutdown are idempotent so no problems calling it twice if the pool is shared
+            if (!suppliedScheduledExecutor) {
+                scheduledExecutorService.shutdown();
+                try {
+                    if (!scheduledExecutorService.awaitTermination(180000, TimeUnit.MILLISECONDS))
+                        logger.warn("Timeout while waiting for ScheduledExecutorService of GremlinExecutor to shutdown.");
+                } catch (InterruptedException ie) {
+                    logger.warn("ScheduledExecutorService on GremlinExecutor may not have shutdown properly as shutdown thread terminated early.");
+                }
+            }
+
+            try {
+                scriptEngines.close();
+            } catch (Exception ex) {
+                logger.warn("Error while shutting down the ScriptEngines in the GremlinExecutor", ex);
+            }
+
+            future.complete(null);
+        }, "gremlin-executor-close").start();
+
+        return future;
     }
 
     private void scheduleTimeout(final CompletableFuture<Object> evaluationFuture, final String script, final AtomicBoolean abort) {
         if (scriptEvaluationTimeout > 0) {
-            // Schedule a timeout in the io threadpool for future execution - killing an eval is cheap
+            // Schedule a timeout in the threadpool for future execution - killing an eval is cheap
             final ScheduledFuture<?> sf = scheduledExecutorService.schedule(() -> {
                 logger.info("Timing out script - {} - in thread [{}]", script, Thread.currentThread().getName());
 
@@ -332,6 +377,9 @@ public class GremlinExecutor implements AutoCloseable {
         private List<List<String>> use = new ArrayList<>();
         private Bindings globalBindings = new SimpleBindings();
 
+        private boolean suppliedExecutor = false;
+        private boolean suppliedScheduledExecutor = false;
+
         private Builder() {
             final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("gremlin-executor-%d").build();
             this.scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);
@@ -390,6 +438,7 @@ public class GremlinExecutor implements AutoCloseable {
          */
         public Builder executorService(final ExecutorService executorService) {
             this.executorService = executorService;
+            this.suppliedExecutor = true;
             return this;
         }
 
@@ -398,6 +447,7 @@ public class GremlinExecutor implements AutoCloseable {
          */
         public Builder scheduledExecutorService(final ScheduledExecutorService scheduledExecutorService) {
             this.scheduledExecutorService = scheduledExecutorService;
+            this.suppliedScheduledExecutor = true;
             return this;
         }
 
@@ -451,7 +501,8 @@ public class GremlinExecutor implements AutoCloseable {
 
         public GremlinExecutor create() {
             return new GremlinExecutor(settings, use, scriptEvaluationTimeout, globalBindings, executorService,
-                    scheduledExecutorService, beforeEval, afterSuccess, afterTimeout, afterFailure, enabledPlugins);
+                    scheduledExecutorService, beforeEval, afterSuccess, afterTimeout, afterFailure, enabledPlugins,
+                    suppliedExecutor, suppliedScheduledExecutor);
         }
     }