You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/18 15:20:59 UTC

incubator-tinkerpop git commit: introduced a VertexProgramPool for threaded GraphComputer workers operating on a VertexProgram that is not thread safe.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 97ece24d0 -> e2d3eeeac


introduced a VertexProgramPool for threaded GraphComputer workers operating on a VertexProgram that is not thread safe.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e2d3eeea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e2d3eeea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e2d3eeea

Branch: refs/heads/master
Commit: e2d3eeeace615ae7b3a515e88627183ae7577a52
Parents: 97ece24
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 18 08:20:56 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 18 08:20:56 2015 -0600

----------------------------------------------------------------------
 .../traversal/TraversalVertexProgram.java       | 20 +++---
 .../computer/util/VertexProgramPool.java        | 71 ++++++++++++++++++++
 .../graph/traversal/step/map/TreeStep.java      |  4 +-
 .../gremlin/structure/io/gryo/GryoPool.java     | 24 +++----
 .../traversal/step/sideEffect/TreeTest.java     |  2 +-
 .../computer/giraph/GiraphComputeVertex.java    |  7 +-
 .../computer/giraph/GiraphGraphComputer.java    |  2 +-
 .../computer/giraph/GiraphMessenger.java        |  2 +-
 .../computer/giraph/GiraphWorkerContext.java    | 23 ++++---
 .../gremlin/hadoop/HadoopGraphProvider.java     | 11 ++-
 10 files changed, 119 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 3732fab..527ce1f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -141,19 +141,19 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public void execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final Memory memory) {
         this.traversal.getSideEffects().setLocalVertex(vertex);
-        if (memory.isInitialIteration()) {
+        if (memory.isInitialIteration()) {    // ITERATION 1
             final TraverserSet<Object> haltedTraversers = new TraverserSet<>();
             vertex.property(HALTED_TRAVERSERS, haltedTraversers);
 
-            if (!(this.traversal.getStartStep() instanceof GraphStep))                              // TODO: make this generic to Traversal
+            if (!(this.traversal.getStartStep() instanceof GraphStep))
                 throw new UnsupportedOperationException("TraversalVertexProgram currently only supports GraphStep starts on vertices or edges");
 
-            final GraphStep<Element> startStep = (GraphStep<Element>) this.traversal.getStartStep();
+            final GraphStep<Element> graphStep = (GraphStep<Element>) this.traversal.getStartStep();
+            final String future = graphStep.getNextStep().getId();
             final TraverserGenerator traverserGenerator = this.traversal.getTraverserGenerator();
-            final String future = startStep.getNextStep().getId();
-            if (startStep.returnsVertices()) {  // VERTICES (process the first step locally)
-                if (ElementHelper.idExists(vertex.id(), startStep.getIds())) {
-                    final Traverser.Admin<Element> traverser = traverserGenerator.generate(vertex, startStep, 1l);
+            if (graphStep.returnsVertices()) {  // VERTICES (process the first step locally)
+                if (ElementHelper.idExists(vertex.id(), graphStep.getIds())) {
+                    final Traverser.Admin<Element> traverser = traverserGenerator.generate(vertex, graphStep, 1l);
                     traverser.setStepId(future);
                     traverser.detach();
                     if (traverser.isHalted())
@@ -166,8 +166,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 final Iterator<Edge> starts = vertex.edges(Direction.OUT);
                 while (starts.hasNext()) {
                     final Edge start = starts.next();
-                    if (ElementHelper.idExists(start.id(), startStep.getIds())) {
-                        final Traverser.Admin<Element> traverser = traverserGenerator.generate(start, startStep, 1l);
+                    if (ElementHelper.idExists(start.id(), graphStep.getIds())) {
+                        final Traverser.Admin<Element> traverser = traverserGenerator.generate(start, graphStep, 1l);
                         traverser.setStepId(future);
                         traverser.detach();
                         if (traverser.isHalted())
@@ -180,7 +180,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 }
                 memory.and(VOTE_TO_HALT, voteToHalt);
             }
-        } else {
+        } else {  // ITERATION 1+
             memory.and(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix));
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
new file mode 100644
index 0000000..0be3719
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.computer.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VertexProgramPool {
+
+    private final LinkedBlockingQueue<VertexProgram<?>> pool;
+    private static final int TIMEOUT_MS = 2500;
+
+    public VertexProgramPool(final int poolSize, final Configuration configuration) {
+        this.pool = new LinkedBlockingQueue<>(poolSize);
+        while (this.pool.remainingCapacity() > 0) {
+            this.pool.add(VertexProgram.createVertexProgram(configuration));
+        }
+    }
+
+    public VertexProgram take() {
+        try {
+            return this.pool.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public void offer(final VertexProgram<?> vertexProgram) {
+        try {
+            this.pool.offer(vertexProgram, TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public synchronized void workerIterationStart(final Memory memory) {
+        for (final VertexProgram<?> vertexProgram : this.pool) {
+            vertexProgram.workerIterationStart(memory);
+        }
+    }
+
+    public synchronized void workerIterationEnd(final Memory memory) {
+        for (final VertexProgram<?> vertexProgram : this.pool) {
+            vertexProgram.workerIterationEnd(memory);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/TreeStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/TreeStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/TreeStep.java
index cdb62c5..046d535 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/TreeStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/TreeStep.java
@@ -160,7 +160,9 @@ public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements M
 
         @Override
         public Tree generateFinalResult(final Iterator<KeyValue<NullObject, Tree>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : new Tree();
+            final Tree tree = new Tree();
+            keyValues.forEachRemaining(keyValue -> tree.addTree(keyValue.getValue()));
+            return tree;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 5a55062..0b09c5d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -37,46 +37,46 @@ public class GryoPool {
         this.gryoWriters = new ConcurrentLinkedQueue<>();
     }
 
-    public GryoReader getReader() {
+    public GryoReader takeReader() {
         final GryoReader reader = this.gryoReaders.poll();
         return (null == reader) ? GryoReader.build().create() : reader;
     }
 
-    public GryoWriter getWriter() {
+    public GryoWriter takeWriter() {
         final GryoWriter writer = this.gryoWriters.poll();
         return (null == writer) ? GryoWriter.build().create() : writer;
     }
 
-    public void addReader(final GryoReader gryoReader) {
+    public void offerReader(final GryoReader gryoReader) {
         if (this.gryoReaders.size() < MAX_QUEUE_SIZE)
             this.gryoReaders.offer(gryoReader);
     }
 
-    public void addWriter(final GryoWriter gryoWriter) {
+    public void offerWriter(final GryoWriter gryoWriter) {
         if (this.gryoWriters.size() < MAX_QUEUE_SIZE)
             this.gryoWriters.offer(gryoWriter);
     }
 
     public <A> A doWithReaderWriter(final BiFunction<GryoReader, GryoWriter, A> readerWriterBiFunction) {
-        final GryoReader gryoReader = this.getReader();
-        final GryoWriter gryoWriter = this.getWriter();
+        final GryoReader gryoReader = this.takeReader();
+        final GryoWriter gryoWriter = this.takeWriter();
         final A a = readerWriterBiFunction.apply(gryoReader, gryoWriter);
-        this.addReader(gryoReader);
-        this.addWriter(gryoWriter);
+        this.offerReader(gryoReader);
+        this.offerWriter(gryoWriter);
         return a;
     }
 
     public <A> A doWithReader(final Function<GryoReader, A> readerFunction) {
-        final GryoReader gryoReader = this.getReader();
+        final GryoReader gryoReader = this.takeReader();
         final A a = readerFunction.apply(gryoReader);
-        this.addReader(gryoReader);
+        this.offerReader(gryoReader);
         return a;
     }
 
     public <A> A doWithWriter(final Function<GryoWriter, A> writerFunction) {
-        final GryoWriter gryoWriter = this.getWriter();
+        final GryoWriter gryoWriter = this.takeWriter();
         final A a = writerFunction.apply(gryoWriter);
-        this.addWriter(gryoWriter);
+        this.offerWriter(gryoWriter);
         return a;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeTest.java
index 9f403ba..7f46632 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeTest.java
@@ -82,7 +82,7 @@ public abstract class TreeTest extends AbstractGremlinProcessTest {
             assertEquals(1, ((Map) tree.get(convertToVertexId("marko"))).size());
             assertTrue(((Map) tree.get(convertToVertexId("marko"))).containsKey(convertToVertexId("josh")));
             assertTrue(((Map) ((Map) tree.get(convertToVertexId("marko"))).get(convertToVertexId("josh"))).containsKey(convertToVertexId("lop")));
-            // TODO: fails randomly assertTrue(((Map) ((Map) tree.get(convertToVertexId("marko"))).get(convertToVertexId("josh"))).containsKey(convertToVertexId("ripple")));
+            assertTrue(((Map) ((Map) tree.get(convertToVertexId("marko"))).get(convertToVertexId("josh"))).containsKey(convertToVertexId("ripple")));
         });
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
index 2b4e3f0..f23f4a8 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
@@ -57,18 +57,19 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, VertexWritab
     @Override
     public void compute(final Iterable<ObjectWritable> messages) {
         final GiraphWorkerContext workerContext = (GiraphWorkerContext) this.getWorkerContext();
-        final VertexProgram vertexProgram = workerContext.getVertexProgram();
+        final VertexProgram vertexProgram = workerContext.getVertexProgramPool().take();
         final GiraphMemory memory = workerContext.getMemory();
         final GiraphMessenger messenger = workerContext.getMessenger(this, messages);
         final StrategyVertex wrappedVertex = ComputerDataStrategy.wrapVertex(this.getValue().get(), vertexProgram);
         ///////////
-        if (!(Boolean) ((RuleWritable) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject())
+        if (!(Boolean) ((RuleWritable) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject()) {
             vertexProgram.execute(wrappedVertex, messenger, memory);  // TODO provide a wrapper around TinkerVertex for Edge and non-ComputeKeys manipulation
-        else if (workerContext.deriveMemory()) {
+        } else if (workerContext.deriveMemory()) {
             final MapMemory mapMemory = new MapMemory();
             memory.asMap().forEach(mapMemory::set);
             mapMemory.setIteration(memory.getIteration() - 1);
             wrappedVertex.property(VertexProperty.Cardinality.single, Constants.MAP_MEMORY, mapMemory);  // TODO: this is a "computer key"
         }
+        workerContext.getVertexProgramPool().offer(vertexProgram);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index 1320c8f..8275ca7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -88,7 +88,7 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
         this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
         this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), LongWritable.class, LongWritable.class);
         this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
-        //this.giraphConfiguration.setBoolean("giraph.isStaticGraph", true);
+        this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
         this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
         this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
index c34f9c6..c0bec5d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
@@ -39,7 +39,7 @@ public class GiraphMessenger<M> implements Messenger<M> {
     private GiraphComputeVertex giraphComputeVertex;
     private Iterable<ObjectWritable<M>> messages;
 
-    public void setCurrentVertex(final GiraphComputeVertex giraphComputeVertex, final Iterable<ObjectWritable<M>> messages) {
+    public GiraphMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterable<ObjectWritable<M>> messages) {
         this.giraphComputeVertex = giraphComputeVertex;
         this.messages = messages;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
index 55dd82c..a315c5c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
@@ -18,21 +18,22 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
 
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class GiraphWorkerContext extends WorkerContext {
 
-    private VertexProgram<?> vertexProgram;
+    private VertexProgramPool vertexProgramPool;
     private GiraphMemory memory;
-    private GiraphMessenger messenger;
     private boolean deriveMemory;
 
     public GiraphWorkerContext() {
@@ -40,9 +41,10 @@ public final class GiraphWorkerContext extends WorkerContext {
     }
 
     public void preApplication() throws InstantiationException, IllegalAccessException {
-        this.vertexProgram = VertexProgram.createVertexProgram(ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration()));
-        this.memory = new GiraphMemory(this, this.vertexProgram);
-        this.messenger = new GiraphMessenger();
+        this.vertexProgramPool = new VertexProgramPool(
+                this.getContext().getConfiguration().getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 1),
+                ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration()));
+        this.memory = new GiraphMemory(this, VertexProgram.createVertexProgram(ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration())));
         this.deriveMemory = this.getContext().getConfiguration().getBoolean(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, false);
     }
 
@@ -51,15 +53,15 @@ public final class GiraphWorkerContext extends WorkerContext {
     }
 
     public void preSuperstep() {
-        this.vertexProgram.workerIterationStart(new ImmutableMemory(this.memory));
+        this.vertexProgramPool.workerIterationStart(new ImmutableMemory(this.memory));
     }
 
     public void postSuperstep() {
-        this.vertexProgram.workerIterationEnd(new ImmutableMemory(this.memory));
+        this.vertexProgramPool.workerIterationEnd(new ImmutableMemory(this.memory));
     }
 
-    public VertexProgram<?> getVertexProgram() {
-        return this.vertexProgram;
+    public VertexProgramPool getVertexProgramPool() {
+        return this.vertexProgramPool;
     }
 
     public GiraphMemory getMemory() {
@@ -67,8 +69,7 @@ public final class GiraphWorkerContext extends WorkerContext {
     }
 
     public GiraphMessenger getMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterable<ObjectWritable> messages) {
-        this.messenger.setCurrentVertex(giraphComputeVertex, messages);
-        return this.messenger;
+        return new GiraphMessenger(giraphComputeVertex, messages);
     }
 
     public boolean deriveMemory() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 0802501..d127132 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -25,7 +25,6 @@ import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopElement;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
@@ -34,8 +33,6 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONResourceAccess;
@@ -117,10 +114,10 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
             put("giraph.zkServerPort", "2181");  // you must have a local zookeeper running on this port
             put("giraph.nettyServerUseExecutionHandler", false); // this prevents so many integration tests running out of threads
             put("giraph.nettyClientUseExecutionHandler", false); // this prevents so many integration tests running out of threads
-            put("giraph.numInputThreads",4);
-            put("giraph.numComputeThreads",4);
-            put("giraph.vertexOutputFormatThreadSafe",true);
-            put("giraph.numOutputThreads",4);
+            put("giraph.numInputThreads", 3);
+            put("giraph.numComputeThreads", 3);
+            put("giraph.vertexOutputFormatThreadSafe", true);
+            put("giraph.numOutputThreads", 3);
 
 
             /// spark configuration