You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/29 19:46:37 UTC
incubator-tinkerpop git commit: threading issue in
TinkerGraphComputer. Trying to fix.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/thread-issue-tinkergraph [created] a23dfa4a4
threading issue in TinkerGraphComputer. Trying to fix.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/a23dfa4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/a23dfa4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/a23dfa4a
Branch: refs/heads/thread-issue-tinkergraph
Commit: a23dfa4a4fbbb033d4badc2f2bf01cf30be4e975
Parents: 3373238
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Sep 29 11:46:22 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Sep 29 11:46:22 2015 -0600
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 6 ++
.../gremlin/process/computer/GraphComputer.java | 13 +++
.../process/computer/GraphComputerTest.java | 89 +++++++++++++++++++-
.../computer/AbstractHadoopGraphComputer.java | 7 ++
.../process/computer/SparkGraphComputer.java | 20 +++--
.../process/computer/TinkerGraphComputer.java | 9 +-
.../process/computer/TinkerWorkerPool.java | 37 +++++---
7 files changed, 160 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index bdea4f1..7c11b69 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -93,6 +93,12 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
}
@Override
+ public GraphComputer workers(final int workers) {
+ this.giraphConfiguration.setWorkerConfiguration(this.workers, this.workers, 100.0F);
+ return super.workers(workers);
+ }
+
+ @Override
public Future<ComputerResult> submit() {
final long startTime = System.currentTimeMillis();
super.validateStatePriorToExecution();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
index 1cefc47..75c72df 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -95,6 +95,15 @@ public interface GraphComputer {
public GraphComputer mapReduce(final MapReduce mapReduce);
/**
+ * Set the desired number of workers to execute the {@code VertexProgram} and {@code MapReduce} jobs.
+ * This is a recommendation to the underlying {@code GraphComputer} implementation and is allowed to deviate accordingly by the implementation.
+ *
+ * @param workers the number of workers to execute the submission
+ * @return the updated GraphComputer with newly set worker count
+ */
+ public GraphComputer workers(final int workers);
+
+ /**
* Submit the {@link VertexProgram} and the set of {@link MapReduce} jobs for execution by the {@link GraphComputer}.
*
* @return a {@link Future} denoting a reference to the asynchronous computation and where to get the {@link org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult} when its is complete.
@@ -108,6 +117,10 @@ public interface GraphComputer {
public interface Features {
+ public default boolean supportsWorkerSpecification() {
+ return true;
+ }
+
public default boolean supportsGlobalMessageScopes() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 40ae469..2c57e07 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.process.computer;
+import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.ExceptionCoverage;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
@@ -43,6 +44,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
import static org.junit.Assert.*;
@@ -67,7 +69,7 @@ import static org.junit.Assert.*;
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public class GraphComputerTest extends AbstractGremlinProcessTest {
- @Test
+ /* @Test
@LoadGraphWith(MODERN)
public void shouldHaveStandardStringRepresentation() {
final GraphComputer computer = graph.compute(graphComputerClass.get());
@@ -120,6 +122,11 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
}
@Override
+ public GraphComputer workers(final int workers) {
+ return null;
+ }
+
+ @Override
public Future<ComputerResult> submit() {
return null;
}
@@ -812,6 +819,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
return list;
}
}
+
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
@@ -1399,8 +1407,85 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.EDGES;
}
- }
+ }*/
/////////////////////////////////////////////
+ @Test
+ @LoadGraphWith(GRATEFUL)
+ public void shouldSupportWorkerCount() throws Exception {
+ final GraphComputer computer = graph.compute(graphComputerClass.get());
+ if (computer.features().supportsWorkerSpecification()) {
+ ComputerResult result = computer.program(new VertexProgramL()).workers(1).submit().get();
+ assertEquals(1l, (long) result.memory().get("workerCount"));
+ ////
+ result = graph.compute(graphComputerClass.get()).program(new VertexProgramL()).workers(2).submit().get();
+ assertEquals(2l, (long) result.memory().get("workerCount"));
+ }
+ }
+
+ public static class VertexProgramL implements VertexProgram {
+
+ final Set<String> threadIds = new ConcurrentSkipListSet<>();
+
+ @Override
+ public void setup(final Memory memory) {
+ memory.set("workerCount", 0l);
+ }
+
+ @Override
+ public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
+ if(!this.threadIds.contains(Thread.currentThread().getName())) {
+ memory.incr("workerCount", 1l);
+ this.threadIds.add(Thread.currentThread().getName());
+ System.out.println(this.threadIds);
+ }
+ }
+
+ @Override
+ public boolean terminate(final Memory memory) {
+ return true;
+ }
+
+ @Override
+ public Set<String> getMemoryComputeKeys() {
+ return new HashSet<>(Arrays.asList("workerCount"));
+ }
+
+ /*public void workerIterationStart(final Memory memory) {
+ assertEquals(0l, (long) memory.get("workerCount"));
+ }
+
+ public void workerIterationEnd(final Memory memory) {
+ assertEquals(1l, (long) memory.get("workerCount"));
+ }*/
+
+ @Override
+ public Set<MessageScope> getMessageScopes(Memory memory) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public GraphComputer.ResultGraph getPreferredResultGraph() {
+ return GraphComputer.ResultGraph.NEW;
+ }
+
+ @Override
+ public GraphComputer.Persist getPreferredPersist() {
+ return GraphComputer.Persist.NOTHING;
+ }
+
+ @Override
+ @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
+ public VertexProgramL clone() {
+ return new VertexProgramL();
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ VertexProgram.super.storeState(configuration);
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index 4589a0c..ca2a931 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -47,6 +47,7 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
protected boolean executed = false;
protected final Set<MapReduce> mapReducers = new HashSet<>();
protected VertexProgram<Object> vertexProgram;
+ protected int workers = -1;
protected ResultGraph resultGraph = null;
protected Persist persist = null;
@@ -81,6 +82,12 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
}
@Override
+ public GraphComputer workers(final int workers) {
+ this.workers = workers;
+ return this;
+ }
+
+ @Override
public String toString() {
return StringFactory.graphComputerString(this);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index be5c19e..0f14b05 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -31,11 +31,6 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputFormatRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -48,6 +43,11 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import java.io.File;
import java.io.IOException;
@@ -65,6 +65,15 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
@Override
+ public GraphComputer workers(final int workers) {
+ super.workers(workers);
+ if (this.hadoopGraph.configuration().getString("spark.master").startsWith("local")) {
+ this.hadoopGraph.configuration().setProperty("spark.master", "local[" + this.workers + "]");
+ }
+ return this;
+ }
+
+ @Override
public Future<ComputerResult> submit() {
super.validateStatePriorToExecution();
// apache and hadoop configurations that are used throughout the graph computer computation
@@ -137,7 +146,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
this.vertexProgram.storeState(vertexProgramConfiguration);
ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
-
// execute the vertex program
while (true) {
memory.setInTask(true);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 4d0e631..9840d8a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -55,6 +55,7 @@ public final class TinkerGraphComputer implements GraphComputer {
private final TinkerMessageBoard messageBoard = new TinkerMessageBoard();
private boolean executed = false;
private final Set<MapReduce> mapReducers = new HashSet<>();
+ private int workers = Runtime.getRuntime().availableProcessors();
public TinkerGraphComputer(final TinkerGraph graph) {
this.graph = graph;
@@ -85,6 +86,12 @@ public final class TinkerGraphComputer implements GraphComputer {
}
@Override
+ public GraphComputer workers(final int workers) {
+ this.workers = workers;
+ return this;
+ }
+
+ @Override
public Future<ComputerResult> submit() {
// a graph computer can only be executed once
if (this.executed)
@@ -109,7 +116,7 @@ public final class TinkerGraphComputer implements GraphComputer {
this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
return CompletableFuture.<ComputerResult>supplyAsync(() -> {
final long time = System.currentTimeMillis();
- try (final TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors())) {
+ try (final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers)) {
if (null != this.vertexProgram) {
TinkerHelper.createGraphComputerView(this.graph, this.vertexProgram.getElementComputeKeys());
// execute the vertex program
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index b47809f..c6d3f11 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -20,13 +20,16 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.function.Consumer;
/**
@@ -57,26 +60,36 @@ public final class TinkerWorkerPool implements AutoCloseable {
}
public void executeVertexProgram(final Consumer<VertexProgram> worker) {
- try {
- this.workerPool.submit(() -> {
+ final List<Callable<Object>> tasks = new ArrayList<>();
+ for (int i = 0; i < 1; i++) {
+ tasks.add(() -> {
final VertexProgram vp = this.vertexProgramPool.take();
worker.accept(vp);
this.vertexProgramPool.offer(vp);
- }).get();
+ return null;
+ });
+ }
+ try {
+ final List<Future<Object>> futures = this.workerPool.invokeAll(tasks);
+ for(Future future : futures) {
+ future.get();
+ }
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
public void executeMapReduce(final Consumer<MapReduce> worker) {
- try {
- this.workerPool.submit(() -> {
- final MapReduce mr = this.mapReducePool.take();
- worker.accept(mr);
- this.mapReducePool.offer(mr);
- }).get();
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
+ for (int i = 0; i < this.numberOfWorkers; i++) {
+ try {
+ this.workerPool.submit(() -> {
+ final MapReduce mr = this.mapReducePool.take();
+ worker.accept(mr);
+ this.mapReducePool.offer(mr);
+ }).get();
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
}
}