You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/09 14:54:21 UTC
[09/17] tinkerpop git commit: no more SynchronizedIterator for
TinkerGraphComputer. Each worker/thread has their own Iterator
partition. In TinkerPop 3.3.0 (with Partitioner) this will be replaced
(easily).
no more SynchronizedIterator for TinkerGraphComputer. Each worker/thread has their own Iterator<Vertex> partition. In TinkerPop 3.3.0 (with Partitioner) this will be replaced (easily).
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3dd1f6e5
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3dd1f6e5
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3dd1f6e5
Branch: refs/heads/tp32
Commit: 3dd1f6e5e141d715e678c66acd0516806e625047
Parents: 27a4b27
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 12:39:35 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:17 2017 -0700
----------------------------------------------------------------------
.../process/computer/TinkerGraphComputer.java | 14 +++-----
.../process/computer/TinkerWorkerPool.java | 36 +++++++++++++++++---
2 files changed, 36 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3dd1f6e5/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 7523d63..2abce9a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -156,24 +156,21 @@ public final class TinkerGraphComputer implements GraphComputer {
this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
return computerService.submit(() -> {
final long time = System.currentTimeMillis();
- final TinkerGraphComputerView view;
- final TinkerWorkerPool workers = new TinkerWorkerPool(this.memory, this.workers);
+ final TinkerGraphComputerView view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, null != this.vertexProgram ? this.vertexProgram.getVertexComputeKeys() : Collections.emptySet());
+ final TinkerWorkerPool workers = new TinkerWorkerPool(this.graph, this.memory, this.workers);
try {
if (null != this.vertexProgram) {
- view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
// execute the vertex program
this.vertexProgram.setup(this.memory);
while (true) {
if (Thread.interrupted()) throw new TraversalInterruptedException();
this.memory.completeSubRound();
workers.setVertexProgram(this.vertexProgram);
- final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
- workers.executeVertexProgram((vertexProgram, workerMemory) -> {
+ workers.executeVertexProgram((vertices, vertexProgram, workerMemory) -> {
vertexProgram.workerIterationStart(workerMemory.asImmutable());
- while (true) {
+ while (vertices.hasNext()) {
final Vertex vertex = vertices.next();
if (Thread.interrupted()) throw new TraversalInterruptedException();
- if (null == vertex) break;
vertexProgram.execute(
ComputerGraph.vertexProgram(vertex, vertexProgram),
new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
@@ -192,9 +189,6 @@ public final class TinkerGraphComputer implements GraphComputer {
}
}
view.complete(); // drop all transient vertex compute keys
- } else {
- // MapReduce only
- view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, Collections.emptySet());
}
// execute mapreduce jobs
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3dd1f6e5/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 140d347..637b416 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -23,14 +23,20 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerHelper;
+import org.apache.tinkerpop.gremlin.util.function.TriConsumer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
@@ -48,13 +54,33 @@ public final class TinkerWorkerPool implements AutoCloseable {
private VertexProgramPool vertexProgramPool;
private MapReducePool mapReducePool;
private final Queue<TinkerWorkerMemory> workerMemoryPool = new ConcurrentLinkedQueue<>();
+ private final List<List<Vertex>> workerVertices = new ArrayList<>();
- public TinkerWorkerPool(final TinkerMemory memory, final int numberOfWorkers) {
+ public TinkerWorkerPool(final TinkerGraph graph, final TinkerMemory memory, final int numberOfWorkers) {
this.numberOfWorkers = numberOfWorkers;
this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
this.completionService = new ExecutorCompletionService<>(this.workerPool);
for (int i = 0; i < this.numberOfWorkers; i++) {
this.workerMemoryPool.add(new TinkerWorkerMemory(memory));
+ this.workerVertices.add(new ArrayList<>());
+ }
+ int batchSize = TinkerHelper.getVertices(graph).size() / this.numberOfWorkers;
+ if (0 == batchSize)
+ batchSize = 1;
+ int counter = 0;
+ int index = 0;
+
+ List<Vertex> currentWorkerVertices = this.workerVertices.get(index);
+ final Iterator<Vertex> iterator = graph.vertices();
+ while (iterator.hasNext()) {
+ final Vertex vertex = iterator.next();
+ if (counter++ < batchSize || index == this.workerVertices.size() - 1) {
+ currentWorkerVertices.add(vertex);
+ } else {
+ currentWorkerVertices = this.workerVertices.get(++index);
+ currentWorkerVertices.add(vertex);
+ counter = 1;
+ }
}
}
@@ -66,12 +92,14 @@ public final class TinkerWorkerPool implements AutoCloseable {
this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
}
- public void executeVertexProgram(final BiConsumer<VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException {
+ public void executeVertexProgram(final TriConsumer<Iterator<Vertex>, VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException {
for (int i = 0; i < this.numberOfWorkers; i++) {
+ final int index = i;
this.completionService.submit(() -> {
final VertexProgram vp = this.vertexProgramPool.take();
final TinkerWorkerMemory workerMemory = this.workerMemoryPool.poll();
- worker.accept(vp, workerMemory);
+ final List<Vertex> vertices = this.workerVertices.get(index);
+ worker.accept(vertices.iterator(), vp, workerMemory);
this.vertexProgramPool.offer(vp);
this.workerMemoryPool.offer(workerMemory);
return null;