You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/09 14:54:14 UTC

[02/17] tinkerpop git commit: Added Serializer.cloneObject() which clones via serialization (helper method). TinkerGraphComputer now how a sound distributed Memory system where each worker/thread aggregates without concurrency locally and then, at the en

Added Serializer.cloneObject() which clones via serialization (helper method). TinkerGraphComputer now how a sound distributed Memory system where each worker/thread aggregates without concurrency locally and then, at the end of the iteration, the thread-distributed memories are aggregated into the main TinkerMemory.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/1ac003d0
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/1ac003d0
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/1ac003d0

Branch: refs/heads/tp32
Commit: 1ac003d00807c1351594d04a4bbdb55e93b00134
Parents: 056d7ae
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 04:06:30 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 16:59:45 2017 -0700

----------------------------------------------------------------------
 .../apache/tinkerpop/gremlin/util/Serializer.java    |  4 ++++
 .../process/computer/TinkerGraphComputer.java        |  8 +++-----
 .../process/computer/TinkerWorkerMemory.java         |  6 +++---
 .../process/computer/TinkerWorkerPool.java           | 15 ++++++++++++---
 4 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
index 28bab16..988fce3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
@@ -47,4 +47,8 @@ public final class Serializer {
         in.close();
         return object;
     }
+
+    public static <V> V cloneObject(final V object) throws IOException, ClassNotFoundException {
+        return (V) Serializer.deserializeObject(Serializer.serializeObject(object));
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index fef2e1a..7523d63 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -157,7 +157,7 @@ public final class TinkerGraphComputer implements GraphComputer {
         return computerService.submit(() -> {
             final long time = System.currentTimeMillis();
             final TinkerGraphComputerView view;
-            final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers);
+            final TinkerWorkerPool workers = new TinkerWorkerPool(this.memory, this.workers);
             try {
                 if (null != this.vertexProgram) {
                     view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
@@ -168,8 +168,7 @@ public final class TinkerGraphComputer implements GraphComputer {
                         this.memory.completeSubRound();
                         workers.setVertexProgram(this.vertexProgram);
                         final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
-                        workers.executeVertexProgram(vertexProgram -> {
-                            final TinkerWorkerMemory workerMemory = new TinkerWorkerMemory(this.memory);
+                        workers.executeVertexProgram((vertexProgram, workerMemory) -> {
                             vertexProgram.workerIterationStart(workerMemory.asImmutable());
                             while (true) {
                                 final Vertex vertex = vertices.next();
@@ -178,8 +177,7 @@ public final class TinkerGraphComputer implements GraphComputer {
                                 vertexProgram.execute(
                                         ComputerGraph.vertexProgram(vertex, vertexProgram),
                                         new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
-                                        workerMemory
-                                );
+                                        workerMemory);
                             }
                             vertexProgram.workerIterationEnd(workerMemory.asImmutable());
                             workerMemory.complete();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
index 28b99e3..081e4fa 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.util.Serializer;
 
 import java.io.IOException;
@@ -43,10 +42,10 @@ public final class TinkerWorkerMemory implements Memory.Admin {
         this.mainMemory = mainMemory;
         for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) {
             try {
-                final MemoryComputeKey clone = (MemoryComputeKey) Serializer.deserializeObject(Serializer.serializeObject(key));
+                final MemoryComputeKey clone = Serializer.cloneObject(key);
                 this.reducers.put(clone.getKey(), clone.getReducer());
             } catch (final IOException | ClassNotFoundException e) {
-                this.reducers.put(key.getKey(), key.getReducer()); // super ghetto
+                throw new IllegalStateException(e.getMessage(), e);
             }
         }
     }
@@ -112,5 +111,6 @@ public final class TinkerWorkerMemory implements Memory.Admin {
         for (final Map.Entry<String, Object> entry : this.workerMemory.entrySet()) {
             this.mainMemory.add(entry.getKey(), entry.getValue());
         }
+        this.workerMemory.clear();
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 3d851bf..140d347 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -24,10 +24,13 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
 
+import java.util.Queue;
 import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 /**
@@ -44,11 +47,15 @@ public final class TinkerWorkerPool implements AutoCloseable {
 
     private VertexProgramPool vertexProgramPool;
     private MapReducePool mapReducePool;
+    private final Queue<TinkerWorkerMemory> workerMemoryPool = new ConcurrentLinkedQueue<>();
 
-    public TinkerWorkerPool(final int numberOfWorkers) {
+    public TinkerWorkerPool(final TinkerMemory memory, final int numberOfWorkers) {
         this.numberOfWorkers = numberOfWorkers;
         this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
         this.completionService = new ExecutorCompletionService<>(this.workerPool);
+        for (int i = 0; i < this.numberOfWorkers; i++) {
+            this.workerMemoryPool.add(new TinkerWorkerMemory(memory));
+        }
     }
 
     public void setVertexProgram(final VertexProgram vertexProgram) {
@@ -59,12 +66,14 @@ public final class TinkerWorkerPool implements AutoCloseable {
         this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
     }
 
-    public void executeVertexProgram(final Consumer<VertexProgram> worker) throws InterruptedException {
+    public void executeVertexProgram(final BiConsumer<VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException {
         for (int i = 0; i < this.numberOfWorkers; i++) {
             this.completionService.submit(() -> {
                 final VertexProgram vp = this.vertexProgramPool.take();
-                worker.accept(vp);
+                final TinkerWorkerMemory workerMemory = this.workerMemoryPool.poll();
+                worker.accept(vp, workerMemory);
                 this.vertexProgramPool.offer(vp);
+                this.workerMemoryPool.offer(workerMemory);
                 return null;
             });
         }