You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/06 00:00:39 UTC
[11/20] tinkerpop git commit: Added Serializer.cloneObject() which
clones via serialization (helper method). TinkerGraphComputer now how a sound
distributed Memory system where each worker/thread aggregates without
concurrency locally and then, at the en
Added Serializer.cloneObject() which clones via serialization (helper method). TinkerGraphComputer now how a sound distributed Memory system where each worker/thread aggregates without concurrency locally and then, at the end of the iteration, the thread-distributed memories are aggregated into the main TinkerMemory.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/1ac003d0
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/1ac003d0
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/1ac003d0
Branch: refs/heads/TINKERPOP-1585
Commit: 1ac003d00807c1351594d04a4bbdb55e93b00134
Parents: 056d7ae
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 04:06:30 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 16:59:45 2017 -0700
----------------------------------------------------------------------
.../apache/tinkerpop/gremlin/util/Serializer.java | 4 ++++
.../process/computer/TinkerGraphComputer.java | 8 +++-----
.../process/computer/TinkerWorkerMemory.java | 6 +++---
.../process/computer/TinkerWorkerPool.java | 15 ++++++++++++---
4 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
index 28bab16..988fce3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
@@ -47,4 +47,8 @@ public final class Serializer {
in.close();
return object;
}
+
+ public static <V> V cloneObject(final V object) throws IOException, ClassNotFoundException {
+ return (V) Serializer.deserializeObject(Serializer.serializeObject(object));
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index fef2e1a..7523d63 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -157,7 +157,7 @@ public final class TinkerGraphComputer implements GraphComputer {
return computerService.submit(() -> {
final long time = System.currentTimeMillis();
final TinkerGraphComputerView view;
- final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers);
+ final TinkerWorkerPool workers = new TinkerWorkerPool(this.memory, this.workers);
try {
if (null != this.vertexProgram) {
view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
@@ -168,8 +168,7 @@ public final class TinkerGraphComputer implements GraphComputer {
this.memory.completeSubRound();
workers.setVertexProgram(this.vertexProgram);
final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
- workers.executeVertexProgram(vertexProgram -> {
- final TinkerWorkerMemory workerMemory = new TinkerWorkerMemory(this.memory);
+ workers.executeVertexProgram((vertexProgram, workerMemory) -> {
vertexProgram.workerIterationStart(workerMemory.asImmutable());
while (true) {
final Vertex vertex = vertices.next();
@@ -178,8 +177,7 @@ public final class TinkerGraphComputer implements GraphComputer {
vertexProgram.execute(
ComputerGraph.vertexProgram(vertex, vertexProgram),
new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
- workerMemory
- );
+ workerMemory);
}
vertexProgram.workerIterationEnd(workerMemory.asImmutable());
workerMemory.complete();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
index 28b99e3..081e4fa 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.util.Serializer;
import java.io.IOException;
@@ -43,10 +42,10 @@ public final class TinkerWorkerMemory implements Memory.Admin {
this.mainMemory = mainMemory;
for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) {
try {
- final MemoryComputeKey clone = (MemoryComputeKey) Serializer.deserializeObject(Serializer.serializeObject(key));
+ final MemoryComputeKey clone = Serializer.cloneObject(key);
this.reducers.put(clone.getKey(), clone.getReducer());
} catch (final IOException | ClassNotFoundException e) {
- this.reducers.put(key.getKey(), key.getReducer()); // super ghetto
+ throw new IllegalStateException(e.getMessage(), e);
}
}
}
@@ -112,5 +111,6 @@ public final class TinkerWorkerMemory implements Memory.Admin {
for (final Map.Entry<String, Object> entry : this.workerMemory.entrySet()) {
this.mainMemory.add(entry.getKey(), entry.getValue());
}
+ this.workerMemory.clear();
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 3d851bf..140d347 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -24,10 +24,13 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
+import java.util.Queue;
import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
@@ -44,11 +47,15 @@ public final class TinkerWorkerPool implements AutoCloseable {
private VertexProgramPool vertexProgramPool;
private MapReducePool mapReducePool;
+ private final Queue<TinkerWorkerMemory> workerMemoryPool = new ConcurrentLinkedQueue<>();
- public TinkerWorkerPool(final int numberOfWorkers) {
+ public TinkerWorkerPool(final TinkerMemory memory, final int numberOfWorkers) {
this.numberOfWorkers = numberOfWorkers;
this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
this.completionService = new ExecutorCompletionService<>(this.workerPool);
+ for (int i = 0; i < this.numberOfWorkers; i++) {
+ this.workerMemoryPool.add(new TinkerWorkerMemory(memory));
+ }
}
public void setVertexProgram(final VertexProgram vertexProgram) {
@@ -59,12 +66,14 @@ public final class TinkerWorkerPool implements AutoCloseable {
this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
}
- public void executeVertexProgram(final Consumer<VertexProgram> worker) throws InterruptedException {
+ public void executeVertexProgram(final BiConsumer<VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException {
for (int i = 0; i < this.numberOfWorkers; i++) {
this.completionService.submit(() -> {
final VertexProgram vp = this.vertexProgramPool.take();
- worker.accept(vp);
+ final TinkerWorkerMemory workerMemory = this.workerMemoryPool.poll();
+ worker.accept(vp, workerMemory);
this.vertexProgramPool.offer(vp);
+ this.workerMemoryPool.offer(workerMemory);
return null;
});
}