You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2017/01/12 15:21:28 UTC

[11/50] [abbrv] tinkerpop git commit: no more SynchronizedIterator for TinkerGraphComputer. Each worker/thread has their own Iterator partition. In TinkerPop 3.3.0 (with Partitioner) this will be replaced (easily).

no more SynchronizedIterator for TinkerGraphComputer. Each worker/thread has their own Iterator<Vertex> partition. In TinkerPop 3.3.0 (with Partitioner) this will be replaced (easily).


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3dd1f6e5
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3dd1f6e5
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3dd1f6e5

Branch: refs/heads/TINKERPOP-1565
Commit: 3dd1f6e5e141d715e678c66acd0516806e625047
Parents: 27a4b27
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 12:39:35 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:17 2017 -0700

----------------------------------------------------------------------
 .../process/computer/TinkerGraphComputer.java   | 14 +++-----
 .../process/computer/TinkerWorkerPool.java      | 36 +++++++++++++++++---
 2 files changed, 36 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3dd1f6e5/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 7523d63..2abce9a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -156,24 +156,21 @@ public final class TinkerGraphComputer implements GraphComputer {
         this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
         return computerService.submit(() -> {
             final long time = System.currentTimeMillis();
-            final TinkerGraphComputerView view;
-            final TinkerWorkerPool workers = new TinkerWorkerPool(this.memory, this.workers);
+            final TinkerGraphComputerView view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, null != this.vertexProgram ? this.vertexProgram.getVertexComputeKeys() : Collections.emptySet());
+            final TinkerWorkerPool workers = new TinkerWorkerPool(this.graph, this.memory, this.workers);
             try {
                 if (null != this.vertexProgram) {
-                    view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
                     // execute the vertex program
                     this.vertexProgram.setup(this.memory);
                     while (true) {
                         if (Thread.interrupted()) throw new TraversalInterruptedException();
                         this.memory.completeSubRound();
                         workers.setVertexProgram(this.vertexProgram);
-                        final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
-                        workers.executeVertexProgram((vertexProgram, workerMemory) -> {
+                        workers.executeVertexProgram((vertices, vertexProgram, workerMemory) -> {
                             vertexProgram.workerIterationStart(workerMemory.asImmutable());
-                            while (true) {
+                            while (vertices.hasNext()) {
                                 final Vertex vertex = vertices.next();
                                 if (Thread.interrupted()) throw new TraversalInterruptedException();
-                                if (null == vertex) break;
                                 vertexProgram.execute(
                                         ComputerGraph.vertexProgram(vertex, vertexProgram),
                                         new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
@@ -192,9 +189,6 @@ public final class TinkerGraphComputer implements GraphComputer {
                         }
                     }
                     view.complete(); // drop all transient vertex compute keys
-                } else {
-                    // MapReduce only
-                    view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, Collections.emptySet());
                 }
 
                 // execute mapreduce jobs

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3dd1f6e5/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 140d347..637b416 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -23,14 +23,20 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerHelper;
+import org.apache.tinkerpop.gremlin.util.function.TriConsumer;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 /**
@@ -48,13 +54,33 @@ public final class TinkerWorkerPool implements AutoCloseable {
     private VertexProgramPool vertexProgramPool;
     private MapReducePool mapReducePool;
     private final Queue<TinkerWorkerMemory> workerMemoryPool = new ConcurrentLinkedQueue<>();
+    private final List<List<Vertex>> workerVertices = new ArrayList<>();
 
-    public TinkerWorkerPool(final TinkerMemory memory, final int numberOfWorkers) {
+    public TinkerWorkerPool(final TinkerGraph graph, final TinkerMemory memory, final int numberOfWorkers) {
         this.numberOfWorkers = numberOfWorkers;
         this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
         this.completionService = new ExecutorCompletionService<>(this.workerPool);
         for (int i = 0; i < this.numberOfWorkers; i++) {
             this.workerMemoryPool.add(new TinkerWorkerMemory(memory));
+            this.workerVertices.add(new ArrayList<>());
+        }
+        int batchSize = TinkerHelper.getVertices(graph).size() / this.numberOfWorkers;
+        if (0 == batchSize)
+            batchSize = 1;
+        int counter = 0;
+        int index = 0;
+
+        List<Vertex> currentWorkerVertices = this.workerVertices.get(index);
+        final Iterator<Vertex> iterator = graph.vertices();
+        while (iterator.hasNext()) {
+            final Vertex vertex = iterator.next();
+            if (counter++ < batchSize || index == this.workerVertices.size() - 1) {
+                currentWorkerVertices.add(vertex);
+            } else {
+                currentWorkerVertices = this.workerVertices.get(++index);
+                currentWorkerVertices.add(vertex);
+                counter = 1;
+            }
         }
     }
 
@@ -66,12 +92,14 @@ public final class TinkerWorkerPool implements AutoCloseable {
         this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
     }
 
-    public void executeVertexProgram(final BiConsumer<VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException {
+    public void executeVertexProgram(final TriConsumer<Iterator<Vertex>, VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException {
         for (int i = 0; i < this.numberOfWorkers; i++) {
+            final int index = i;
             this.completionService.submit(() -> {
                 final VertexProgram vp = this.vertexProgramPool.take();
                 final TinkerWorkerMemory workerMemory = this.workerMemoryPool.poll();
-                worker.accept(vp, workerMemory);
+                final List<Vertex> vertices = this.workerVertices.get(index);
+                worker.accept(vertices.iterator(), vp, workerMemory);
                 this.vertexProgramPool.offer(vp);
                 this.workerMemoryPool.offer(workerMemory);
                 return null;