You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/11 21:41:21 UTC
incubator-tinkerpop git commit: Thread pools are properly shutdown if
internal ones are used. #514
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 1734d7a68 -> 4050ded7a
Thread pools are properly shutdown if internal ones are used. #514
Also did a fair amount of thread naming in tests to help with profiling.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4050ded7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4050ded7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4050ded7
Branch: refs/heads/master
Commit: 4050ded7aa3e015c921acd9b40da4fb91502a20a
Parents: 1734d7a
Author: Stephen Mallette <sp...@apache.org>
Authored: Wed Feb 11 15:40:10 2015 -0500
Committer: Stephen Mallette <sp...@apache.org>
Committed: Wed Feb 11 15:40:10 2015 -0500
----------------------------------------------------------------------
.../groovy/engine/GremlinExecutorTest.java | 62 ++++++++++++++++---
.../jsr223/GremlinGroovyScriptEngineTest.java | 8 +--
.../gremlin/groovy/engine/GremlinExecutor.java | 65 +++++++++++++++++---
3 files changed, 117 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4050ded7/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java b/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
index 1f26ba4..757b98e 100644
--- a/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
+++ b/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
@@ -23,6 +23,7 @@ import com.tinkerpop.gremlin.LoadGraphWith;
import com.tinkerpop.gremlin.TestHelper;
import com.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngineTest;
import com.tinkerpop.gremlin.structure.Graph;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.junit.Test;
import org.kohsuke.groovy.sandbox.GroovyInterceptor;
@@ -54,6 +55,7 @@ import static org.junit.Assert.*;
*/
public class GremlinExecutorTest extends AbstractGremlinTest {
public static Map<String, String> PATHS = new HashMap<>();
+ private final BasicThreadFactory testingThreadFactory = new BasicThreadFactory.Builder().namingPattern("test-gremlin-executor-%d").build();
static {
try {
@@ -70,6 +72,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
public void shouldEvalScript() throws Exception {
final GremlinExecutor gremlinExecutor = GremlinExecutor.build().create();
assertEquals(2, gremlinExecutor.eval("1+1").get());
+ gremlinExecutor.close();
}
@Test
@@ -81,6 +84,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
assertEquals(5, gremlinExecutor.eval("1+4").get());
assertEquals(6, gremlinExecutor.eval("1+5").get());
assertEquals(7, gremlinExecutor.eval("1+6").get());
+ gremlinExecutor.close();
}
@Test
@@ -89,6 +93,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
final Bindings b = new SimpleBindings();
b.put("x", 1);
assertEquals(2, gremlinExecutor.eval("1+x", b).get());
+ gremlinExecutor.close();
}
@Test
@@ -97,6 +102,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
b.put("x", 1);
final GremlinExecutor gremlinExecutor = GremlinExecutor.build().globalBindings(b).create();
assertEquals(2, gremlinExecutor.eval("1+x").get());
+ gremlinExecutor.close();
}
@Test
@@ -107,6 +113,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
final Bindings b = new SimpleBindings();
b.put("y", 1);
assertEquals(2, gremlinExecutor.eval("y+x", b).get());
+ gremlinExecutor.close();
}
@Test
@@ -117,6 +124,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
final Bindings b = new SimpleBindings();
b.put("x", 10);
assertEquals(11, gremlinExecutor.eval("x+1", b).get());
+ gremlinExecutor.close();
}
@Test
@@ -143,6 +151,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
assertFalse(successCalled.get());
assertFalse(failureCalled.get());
assertEquals(0, timeOutCount.getCount());
+ gremlinExecutor.close();
}
@Test
@@ -167,6 +176,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
assertFalse(timeoutCalled.get());
assertFalse(successCalled.get());
assertTrue(failureCalled.get());
+ gremlinExecutor.close();
}
@Test
@@ -186,16 +196,17 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
assertFalse(timeoutCalled.get());
assertTrue(successCalled.get());
assertFalse(failureCalled.get());
+ gremlinExecutor.close();
}
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
- public void shouldAllowTraversalToIterateInDifferentThreadThanOriginallyEvaluatedWithAutoCommit() {
+ public void shouldAllowTraversalToIterateInDifferentThreadThanOriginallyEvaluatedWithAutoCommit() throws Exception {
// this test sort of simulates Gremlin Server interaction where a Traversal is eval'd in one Thread, but
// then iterated in another. note that Gremlin Server configures the script engine to auto-commit
// after evaluation. this basically tests the state of the Gremlin Server GremlinExecutor when
// being used in sessionless mode
- final ExecutorService evalExecutor = Executors.newSingleThreadExecutor();
+ final ExecutorService evalExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
final GremlinExecutor gremlinExecutor = GremlinExecutor.build()
.afterSuccess(b -> {
final Graph graph = (Graph) b.get("g");
@@ -209,22 +220,28 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
final AtomicInteger vertexCount = new AtomicInteger(0);
- final ExecutorService iterationExecutor = Executors.newSingleThreadExecutor();
+ final ExecutorService iterationExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
gremlinExecutor.eval("g.V().out()", bindings).thenAcceptAsync(o -> {
final Iterator itty = (Iterator) o;
itty.forEachRemaining(v -> vertexCount.incrementAndGet());
}, iterationExecutor).join();
assertEquals(6, vertexCount.get());
+
+ gremlinExecutor.close();
+ evalExecutor.shutdown();
+ evalExecutor.awaitTermination(30000, TimeUnit.MILLISECONDS);
+ iterationExecutor.shutdown();
+ iterationExecutor.awaitTermination(30000, TimeUnit.MILLISECONDS);
}
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
- public void shouldAllowTraversalToIterateInDifferentThreadThanOriginallyEvaluatedWithoutAutoCommit() {
+ public void shouldAllowTraversalToIterateInDifferentThreadThanOriginallyEvaluatedWithoutAutoCommit() throws Exception {
// this test sort of simulates Gremlin Server interaction where a Traversal is eval'd in one Thread, but
// then iterated in another. this basically tests the state of the Gremlin Server GremlinExecutor when
// being used in session mode
- final ExecutorService evalExecutor = Executors.newSingleThreadExecutor();
+ final ExecutorService evalExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
final GremlinExecutor gremlinExecutor = GremlinExecutor.build().executorService(evalExecutor).create();
final Map<String,Object> bindings = new HashMap<>();
@@ -232,13 +249,19 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
final AtomicInteger vertexCount = new AtomicInteger(0);
- final ExecutorService iterationExecutor = Executors.newSingleThreadExecutor();
+ final ExecutorService iterationExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
gremlinExecutor.eval("g.V().out()", bindings).thenAcceptAsync(o -> {
final Iterator itty = (Iterator) o;
itty.forEachRemaining(v -> vertexCount.incrementAndGet());
}, iterationExecutor).join();
assertEquals(6, vertexCount.get());
+
+ gremlinExecutor.close();
+ evalExecutor.shutdown();
+ evalExecutor.awaitTermination(30000, TimeUnit.MILLISECONDS);
+ iterationExecutor.shutdown();
+ iterationExecutor.awaitTermination(30000, TimeUnit.MILLISECONDS);
}
@Test
@@ -278,13 +301,16 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
assertEquals(2, i2.get());
assertFalse(b1.get());
assertFalse(b2.get());
+
+ gremlinExecutor.close();
+
}
@Test
public void shouldNotExhaustThreads() throws Exception {
// this is not representative of how the GremlinExecutor should be configured. A single thread executor
// shared will create odd behaviors, but it's good for this test.
- final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(testingThreadFactory);
final GremlinExecutor gremlinExecutor = GremlinExecutor.build()
.executorService(executorService)
.scheduledExecutorService(executorService).create();
@@ -301,6 +327,9 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
}));
assertEquals(1000, count.intValue());
+
+ executorService.shutdown();
+ executorService.awaitTermination(30000, TimeUnit.MILLISECONDS);
}
@Test
@@ -341,6 +370,8 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
assertTrue(successes.intValue() > 0);
assertTrue(failures.intValue() >= 500);
+
+ gremlinExecutor.close();
}
@Test
@@ -354,6 +385,8 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
.create();
assertEquals(2, gremlinExecutor.eval("add(1,1)").get());
+
+ gremlinExecutor.close();
}
@Test
@@ -422,5 +455,20 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
gremlinExecutor.getScriptEngines().reset();
assertEquals(2, gremlinExecutor.eval("add(1,1)").get());
+
+ gremlinExecutor.close();
+ }
+
+ @Test
+ public void shouldNotShutdownExecutorServicesSuppliedToGremlinExecutor() throws Exception {
+ final ScheduledExecutorService service = Executors.newScheduledThreadPool(4, testingThreadFactory);
+ final GremlinExecutor gremlinExecutor = GremlinExecutor.build()
+ .executorService(service)
+ .scheduledExecutorService(service).create();
+
+ gremlinExecutor.close();
+ assertFalse(service.isShutdown());
+ service.shutdown();
+ service.awaitTermination(30000, TimeUnit.MILLISECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4050ded7/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java b/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
index 1f7dbe4..324eea0 100644
--- a/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
+++ b/gremlin-groovy-test/src/main/java/com/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
@@ -230,7 +230,7 @@ public class GremlinGroovyScriptEngineTest extends AbstractGremlinTest {
final Random random = new Random();
for (int i = 0; i < runs; i++) {
- new Thread() {
+ new Thread("test-thread-safe-" + i) {
public void run() {
String name = names.get(random.nextInt(names.size() - 1));
try {
@@ -264,7 +264,7 @@ public class GremlinGroovyScriptEngineTest extends AbstractGremlinTest {
final Random random = new Random();
for (int i = 0; i < runs; i++) {
- new Thread() {
+ new Thread("test-thread-safety-on-compiled-script-" + i) {
public void run() {
String name = names.get(random.nextInt(names.size() - 1));
try {
@@ -459,7 +459,7 @@ public class GremlinGroovyScriptEngineTest extends AbstractGremlinTest {
se.printStackTrace();
fail.set(true);
}
- });
+ }, "test-reload-classloader-1");
t.start();
@@ -473,7 +473,7 @@ public class GremlinGroovyScriptEngineTest extends AbstractGremlinTest {
}};
scriptEngine.addImports(imports);
latch.countDown();
- }).start();
+ }, "test-reload-classloader-2").start();
t.join();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4050ded7/gremlin-groovy/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java b/gremlin-groovy/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
index f1184ad..51c668b 100644
--- a/gremlin-groovy/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
+++ b/gremlin-groovy/src/main/java/com/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
@@ -52,7 +52,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
/**
@@ -88,13 +87,15 @@ public class GremlinExecutor implements AutoCloseable {
private final Consumer<Bindings> afterTimeout;
private final BiConsumer<Bindings, Exception> afterFailure;
private final Set<String> enabledPlugins;
+ private final boolean suppliedExecutor;
+ private final boolean suppliedScheduledExecutor;
private GremlinExecutor(final Map<String, EngineSettings> settings, final List<List<String>> use,
final long scriptEvaluationTimeout, final Bindings globalBindings,
final ExecutorService executorService, final ScheduledExecutorService scheduledExecutorService,
final Consumer<Bindings> beforeEval, final Consumer<Bindings> afterSuccess,
final Consumer<Bindings> afterTimeout, final BiConsumer<Bindings, Exception> afterFailure,
- final Set<String> enabledPlugins) {
+ final Set<String> enabledPlugins, final boolean suppliedExecutor, final boolean suppliedScheduledExecutor) {
this.executorService = executorService;
this.scheduledExecutorService = scheduledExecutorService;
this.beforeEval = beforeEval;
@@ -107,6 +108,8 @@ public class GremlinExecutor implements AutoCloseable {
this.globalBindings = globalBindings;
this.enabledPlugins = enabledPlugins;
this.scriptEngines = createScriptEngines();
+ this.suppliedExecutor = suppliedExecutor;
+ this.suppliedScheduledExecutor = suppliedScheduledExecutor;
}
public CompletableFuture<Object> eval(final String script) {
@@ -193,17 +196,59 @@ public class GremlinExecutor implements AutoCloseable {
/**
* {@inheritDoc}
* <p/>
- * Note that the executors are not closed by virtue of this operation. Manage them manually.
+ * Executors are only closed if they were not supplied externally in the
+ * {@link com.tinkerpop.gremlin.groovy.engine.GremlinExecutor.Builder}
*/
@Override
public void close() throws Exception {
- // leave pools running as they are supplied externally. let the sender be responsible for shutting them down
- scriptEngines.close();
+ closeAsync().join();
+ }
+
+ /**
+ * Executors are only closed if they were not supplied externally in the
+ * {@link com.tinkerpop.gremlin.groovy.engine.GremlinExecutor.Builder}
+ */
+ public CompletableFuture<Void> closeAsync() throws Exception {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+
+ new Thread(() -> {
+ // leave pools running if they are supplied externally. let the sender be responsible for shutting them down
+ if (!suppliedExecutor) {
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(180000, TimeUnit.MILLISECONDS))
+ logger.warn("Timeout while waiting for ExecutorService of GremlinExecutor to shutdown.");
+ } catch (InterruptedException ie) {
+ logger.warn("ExecutorService on GremlinExecutor may not have shutdown properly as shutdown thread terminated early.");
+ }
+ }
+
+ // calls to shutdown are idempotent so no problems calling it twice if the pool is shared
+ if (!suppliedScheduledExecutor) {
+ scheduledExecutorService.shutdown();
+ try {
+ if (!scheduledExecutorService.awaitTermination(180000, TimeUnit.MILLISECONDS))
+ logger.warn("Timeout while waiting for ScheduledExecutorService of GremlinExecutor to shutdown.");
+ } catch (InterruptedException ie) {
+ logger.warn("ScheduledExecutorService on GremlinExecutor may not have shutdown properly as shutdown thread terminated early.");
+ }
+ }
+
+ try {
+ scriptEngines.close();
+ } catch (Exception ex) {
+ logger.warn("Error while shutting down the ScriptEngines in the GremlinExecutor", ex);
+ }
+
+ future.complete(null);
+ }, "gremlin-executor-close").start();
+
+ return future;
}
private void scheduleTimeout(final CompletableFuture<Object> evaluationFuture, final String script, final AtomicBoolean abort) {
if (scriptEvaluationTimeout > 0) {
- // Schedule a timeout in the io threadpool for future execution - killing an eval is cheap
+ // Schedule a timeout in the threadpool for future execution - killing an eval is cheap
final ScheduledFuture<?> sf = scheduledExecutorService.schedule(() -> {
logger.info("Timing out script - {} - in thread [{}]", script, Thread.currentThread().getName());
@@ -332,6 +377,9 @@ public class GremlinExecutor implements AutoCloseable {
private List<List<String>> use = new ArrayList<>();
private Bindings globalBindings = new SimpleBindings();
+ private boolean suppliedExecutor = false;
+ private boolean suppliedScheduledExecutor = false;
+
private Builder() {
final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("gremlin-executor-%d").build();
this.scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);
@@ -390,6 +438,7 @@ public class GremlinExecutor implements AutoCloseable {
*/
public Builder executorService(final ExecutorService executorService) {
this.executorService = executorService;
+ this.suppliedExecutor = true;
return this;
}
@@ -398,6 +447,7 @@ public class GremlinExecutor implements AutoCloseable {
*/
public Builder scheduledExecutorService(final ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = scheduledExecutorService;
+ this.suppliedScheduledExecutor = true;
return this;
}
@@ -451,7 +501,8 @@ public class GremlinExecutor implements AutoCloseable {
public GremlinExecutor create() {
return new GremlinExecutor(settings, use, scriptEvaluationTimeout, globalBindings, executorService,
- scheduledExecutorService, beforeEval, afterSuccess, afterTimeout, afterFailure, enabledPlugins);
+ scheduledExecutorService, beforeEval, afterSuccess, afterTimeout, afterFailure, enabledPlugins,
+ suppliedExecutor, suppliedScheduledExecutor);
}
}