You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/29 19:46:37 UTC

incubator-tinkerpop git commit: threading issue in TinkerGraphComputer. Trying to fix.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/thread-issue-tinkergraph [created] a23dfa4a4


threading issue in TinkerGraphComputer. Trying to fix.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/a23dfa4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/a23dfa4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/a23dfa4a

Branch: refs/heads/thread-issue-tinkergraph
Commit: a23dfa4a4fbbb033d4badc2f2bf01cf30be4e975
Parents: 3373238
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Sep 29 11:46:22 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Sep 29 11:46:22 2015 -0600

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   |  6 ++
 .../gremlin/process/computer/GraphComputer.java | 13 +++
 .../process/computer/GraphComputerTest.java     | 89 +++++++++++++++++++-
 .../computer/AbstractHadoopGraphComputer.java   |  7 ++
 .../process/computer/SparkGraphComputer.java    | 20 +++--
 .../process/computer/TinkerGraphComputer.java   |  9 +-
 .../process/computer/TinkerWorkerPool.java      | 37 +++++---
 7 files changed, 160 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index bdea4f1..7c11b69 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -93,6 +93,12 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     }
 
     @Override
+    public GraphComputer workers(final int workers) {
+        this.giraphConfiguration.setWorkerConfiguration(this.workers, this.workers, 100.0F);
+        return super.workers(workers);
+    }
+
+    @Override
     public Future<ComputerResult> submit() {
         final long startTime = System.currentTimeMillis();
         super.validateStatePriorToExecution();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
index 1cefc47..75c72df 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -95,6 +95,15 @@ public interface GraphComputer {
     public GraphComputer mapReduce(final MapReduce mapReduce);
 
     /**
+     * Set the desired number of workers to execute the {@code VertexProgram} and {@code MapReduce} jobs.
+     * This is a recommendation to the underlying {@code GraphComputer} implementation and is allowed to deviate accordingly by the implementation.
+     *
+     * @param workers the number of workers to execute the submission
+     * @return the updated GraphComputer with newly set worker count
+     */
+    public GraphComputer workers(final int workers);
+
+    /**
      * Submit the {@link VertexProgram} and the set of {@link MapReduce} jobs for execution by the {@link GraphComputer}.
      *
      * @return a {@link Future} denoting a reference to the asynchronous computation and where to get the {@link org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult} when its is complete.
@@ -108,6 +117,10 @@ public interface GraphComputer {
 
     public interface Features {
 
+        public default boolean supportsWorkerSpecification() {
+            return true;
+        }
+
         public default boolean supportsGlobalMessageScopes() {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 40ae469..2c57e07 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.process.computer;
 
+import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.ExceptionCoverage;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
@@ -43,6 +44,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
 import static org.junit.Assert.*;
 
@@ -67,7 +69,7 @@ import static org.junit.Assert.*;
 @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
 public class GraphComputerTest extends AbstractGremlinProcessTest {
 
-    @Test
+   /* @Test
     @LoadGraphWith(MODERN)
     public void shouldHaveStandardStringRepresentation() {
         final GraphComputer computer = graph.compute(graphComputerClass.get());
@@ -120,6 +122,11 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
+        public GraphComputer workers(final int workers) {
+            return null;
+        }
+
+        @Override
         public Future<ComputerResult> submit() {
             return null;
         }
@@ -812,6 +819,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
             return list;
         }
     }
+
     /////////////////////////////////////////////
     @Test
     @LoadGraphWith(MODERN)
@@ -1399,8 +1407,85 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         public GraphComputer.Persist getPreferredPersist() {
             return GraphComputer.Persist.EDGES;
         }
-    }
+    }*/
 
     /////////////////////////////////////////////
 
+    @Test
+    @LoadGraphWith(GRATEFUL)
+    public void shouldSupportWorkerCount() throws Exception {
+        final GraphComputer computer = graph.compute(graphComputerClass.get());
+        if (computer.features().supportsWorkerSpecification()) {
+            ComputerResult result = computer.program(new VertexProgramL()).workers(1).submit().get();
+            assertEquals(1l, (long) result.memory().get("workerCount"));
+            ////
+            result = graph.compute(graphComputerClass.get()).program(new VertexProgramL()).workers(2).submit().get();
+            assertEquals(2l, (long) result.memory().get("workerCount"));
+        }
+    }
+
+    public static class VertexProgramL implements VertexProgram {
+
+        final Set<String> threadIds = new ConcurrentSkipListSet<>();
+
+        @Override
+        public void setup(final Memory memory) {
+            memory.set("workerCount", 0l);
+        }
+
+        @Override
+        public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
+            if(!this.threadIds.contains(Thread.currentThread().getName())) {
+                memory.incr("workerCount", 1l);
+                this.threadIds.add(Thread.currentThread().getName());
+                System.out.println(this.threadIds);
+            }
+        }
+
+        @Override
+        public boolean terminate(final Memory memory) {
+            return true;
+        }
+
+        @Override
+        public Set<String> getMemoryComputeKeys() {
+            return new HashSet<>(Arrays.asList("workerCount"));
+        }
+
+        /*public void workerIterationStart(final Memory memory) {
+            assertEquals(0l, (long) memory.get("workerCount"));
+        }
+
+        public void workerIterationEnd(final Memory memory) {
+            assertEquals(1l, (long) memory.get("workerCount"));
+        }*/
+
+        @Override
+        public Set<MessageScope> getMessageScopes(Memory memory) {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public GraphComputer.ResultGraph getPreferredResultGraph() {
+            return GraphComputer.ResultGraph.NEW;
+        }
+
+        @Override
+        public GraphComputer.Persist getPreferredPersist() {
+            return GraphComputer.Persist.NOTHING;
+        }
+
+        @Override
+        @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
+        public VertexProgramL clone() {
+            return new VertexProgramL();
+        }
+
+        @Override
+        public void storeState(final Configuration configuration) {
+            VertexProgram.super.storeState(configuration);
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index 4589a0c..ca2a931 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -47,6 +47,7 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
     protected boolean executed = false;
     protected final Set<MapReduce> mapReducers = new HashSet<>();
     protected VertexProgram<Object> vertexProgram;
+    protected int workers = -1;
 
     protected ResultGraph resultGraph = null;
     protected Persist persist = null;
@@ -81,6 +82,12 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
     }
 
     @Override
+    public GraphComputer workers(final int workers) {
+        this.workers = workers;
+        return this;
+    }
+
+    @Override
     public String toString() {
         return StringFactory.graphComputerString(this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index be5c19e..0f14b05 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -31,11 +31,6 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputFormatRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -48,6 +43,11 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 
 import java.io.File;
 import java.io.IOException;
@@ -65,6 +65,15 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
     }
 
     @Override
+    public GraphComputer workers(final int workers) {
+        super.workers(workers);
+        if (this.hadoopGraph.configuration().getString("spark.master").startsWith("local")) {
+            this.hadoopGraph.configuration().setProperty("spark.master", "local[" + this.workers + "]");
+        }
+        return this;
+    }
+
+    @Override
     public Future<ComputerResult> submit() {
         super.validateStatePriorToExecution();
         // apache and hadoop configurations that are used throughout the graph computer computation
@@ -137,7 +146,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     this.vertexProgram.storeState(vertexProgramConfiguration);
                     ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
                     ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
-
                     // execute the vertex program
                     while (true) {
                         memory.setInTask(true);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 4d0e631..9840d8a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -55,6 +55,7 @@ public final class TinkerGraphComputer implements GraphComputer {
     private final TinkerMessageBoard messageBoard = new TinkerMessageBoard();
     private boolean executed = false;
     private final Set<MapReduce> mapReducers = new HashSet<>();
+    private int workers = Runtime.getRuntime().availableProcessors();
 
     public TinkerGraphComputer(final TinkerGraph graph) {
         this.graph = graph;
@@ -85,6 +86,12 @@ public final class TinkerGraphComputer implements GraphComputer {
     }
 
     @Override
+    public GraphComputer workers(final int workers) {
+        this.workers = workers;
+        return this;
+    }
+
+    @Override
     public Future<ComputerResult> submit() {
         // a graph computer can only be executed once
         if (this.executed)
@@ -109,7 +116,7 @@ public final class TinkerGraphComputer implements GraphComputer {
         this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
             final long time = System.currentTimeMillis();
-            try (final TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors())) {
+            try (final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers)) {
                 if (null != this.vertexProgram) {
                     TinkerHelper.createGraphComputerView(this.graph, this.vertexProgram.getElementComputeKeys());
                     // execute the vertex program

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a23dfa4a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index b47809f..c6d3f11 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -20,13 +20,16 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.function.Consumer;
 
 /**
@@ -57,26 +60,36 @@ public final class TinkerWorkerPool implements AutoCloseable {
     }
 
     public void executeVertexProgram(final Consumer<VertexProgram> worker) {
-        try {
-            this.workerPool.submit(() -> {
+        final List<Callable<Object>> tasks = new ArrayList<>();
+        for (int i = 0; i < 1; i++) {
+            tasks.add(() -> {
                 final VertexProgram vp = this.vertexProgramPool.take();
                 worker.accept(vp);
                 this.vertexProgramPool.offer(vp);
-            }).get();
+                return null;
+            });
+        }
+        try {
+            final List<Future<Object>> futures = this.workerPool.invokeAll(tasks);
+           for(Future future : futures) {
+               future.get();
+           }
         } catch (final Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
     }
 
     public void executeMapReduce(final Consumer<MapReduce> worker) {
-        try {
-            this.workerPool.submit(() -> {
-                final MapReduce mr = this.mapReducePool.take();
-                worker.accept(mr);
-                this.mapReducePool.offer(mr);
-            }).get();
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
+        for (int i = 0; i < this.numberOfWorkers; i++) {
+            try {
+                this.workerPool.submit(() -> {
+                    final MapReduce mr = this.mapReducePool.take();
+                    worker.accept(mr);
+                    this.mapReducePool.offer(mr);
+                }).get();
+            } catch (final Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
         }
     }