You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/18 15:20:59 UTC
incubator-tinkerpop git commit: introduced a VertexProgramPool for
threaded GraphComputer workers operating on a VertexProgram that is not
thread safe.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 97ece24d0 -> e2d3eeeac
introduced a VertexProgramPool for threaded GraphComputer workers operating on a VertexProgram that is not thread safe.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e2d3eeea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e2d3eeea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e2d3eeea
Branch: refs/heads/master
Commit: e2d3eeeace615ae7b3a515e88627183ae7577a52
Parents: 97ece24
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 18 08:20:56 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 18 08:20:56 2015 -0600
----------------------------------------------------------------------
.../traversal/TraversalVertexProgram.java | 20 +++---
.../computer/util/VertexProgramPool.java | 71 ++++++++++++++++++++
.../graph/traversal/step/map/TreeStep.java | 4 +-
.../gremlin/structure/io/gryo/GryoPool.java | 24 +++----
.../traversal/step/sideEffect/TreeTest.java | 2 +-
.../computer/giraph/GiraphComputeVertex.java | 7 +-
.../computer/giraph/GiraphGraphComputer.java | 2 +-
.../computer/giraph/GiraphMessenger.java | 2 +-
.../computer/giraph/GiraphWorkerContext.java | 23 ++++---
.../gremlin/hadoop/HadoopGraphProvider.java | 11 ++-
10 files changed, 119 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 3732fab..527ce1f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -141,19 +141,19 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
@Override
public void execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final Memory memory) {
this.traversal.getSideEffects().setLocalVertex(vertex);
- if (memory.isInitialIteration()) {
+ if (memory.isInitialIteration()) { // ITERATION 1
final TraverserSet<Object> haltedTraversers = new TraverserSet<>();
vertex.property(HALTED_TRAVERSERS, haltedTraversers);
- if (!(this.traversal.getStartStep() instanceof GraphStep)) // TODO: make this generic to Traversal
+ if (!(this.traversal.getStartStep() instanceof GraphStep))
throw new UnsupportedOperationException("TraversalVertexProgram currently only supports GraphStep starts on vertices or edges");
- final GraphStep<Element> startStep = (GraphStep<Element>) this.traversal.getStartStep();
+ final GraphStep<Element> graphStep = (GraphStep<Element>) this.traversal.getStartStep();
+ final String future = graphStep.getNextStep().getId();
final TraverserGenerator traverserGenerator = this.traversal.getTraverserGenerator();
- final String future = startStep.getNextStep().getId();
- if (startStep.returnsVertices()) { // VERTICES (process the first step locally)
- if (ElementHelper.idExists(vertex.id(), startStep.getIds())) {
- final Traverser.Admin<Element> traverser = traverserGenerator.generate(vertex, startStep, 1l);
+ if (graphStep.returnsVertices()) { // VERTICES (process the first step locally)
+ if (ElementHelper.idExists(vertex.id(), graphStep.getIds())) {
+ final Traverser.Admin<Element> traverser = traverserGenerator.generate(vertex, graphStep, 1l);
traverser.setStepId(future);
traverser.detach();
if (traverser.isHalted())
@@ -166,8 +166,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
final Iterator<Edge> starts = vertex.edges(Direction.OUT);
while (starts.hasNext()) {
final Edge start = starts.next();
- if (ElementHelper.idExists(start.id(), startStep.getIds())) {
- final Traverser.Admin<Element> traverser = traverserGenerator.generate(start, startStep, 1l);
+ if (ElementHelper.idExists(start.id(), graphStep.getIds())) {
+ final Traverser.Admin<Element> traverser = traverserGenerator.generate(start, graphStep, 1l);
traverser.setStepId(future);
traverser.detach();
if (traverser.isHalted())
@@ -180,7 +180,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
}
memory.and(VOTE_TO_HALT, voteToHalt);
}
- } else {
+ } else { // ITERATION 1+
memory.and(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
new file mode 100644
index 0000000..0be3719
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.computer.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VertexProgramPool {
+
+ private final LinkedBlockingQueue<VertexProgram<?>> pool;
+ private static final int TIMEOUT_MS = 2500;
+
+ public VertexProgramPool(final int poolSize, final Configuration configuration) {
+ this.pool = new LinkedBlockingQueue<>(poolSize);
+ while (this.pool.remainingCapacity() > 0) {
+ this.pool.add(VertexProgram.createVertexProgram(configuration));
+ }
+ }
+
+ public VertexProgram take() {
+ try {
+ return this.pool.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void offer(final VertexProgram<?> vertexProgram) {
+ try {
+ this.pool.offer(vertexProgram, TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public synchronized void workerIterationStart(final Memory memory) {
+ for (final VertexProgram<?> vertexProgram : this.pool) {
+ vertexProgram.workerIterationStart(memory);
+ }
+ }
+
+ public synchronized void workerIterationEnd(final Memory memory) {
+ for (final VertexProgram<?> vertexProgram : this.pool) {
+ vertexProgram.workerIterationEnd(memory);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/TreeStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/TreeStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/TreeStep.java
index cdb62c5..046d535 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/TreeStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/TreeStep.java
@@ -160,7 +160,9 @@ public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements M
@Override
public Tree generateFinalResult(final Iterator<KeyValue<NullObject, Tree>> keyValues) {
- return keyValues.hasNext() ? keyValues.next().getValue() : new Tree();
+ final Tree tree = new Tree();
+ keyValues.forEachRemaining(keyValue -> tree.addTree(keyValue.getValue()));
+ return tree;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 5a55062..0b09c5d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -37,46 +37,46 @@ public class GryoPool {
this.gryoWriters = new ConcurrentLinkedQueue<>();
}
- public GryoReader getReader() {
+ public GryoReader takeReader() {
final GryoReader reader = this.gryoReaders.poll();
return (null == reader) ? GryoReader.build().create() : reader;
}
- public GryoWriter getWriter() {
+ public GryoWriter takeWriter() {
final GryoWriter writer = this.gryoWriters.poll();
return (null == writer) ? GryoWriter.build().create() : writer;
}
- public void addReader(final GryoReader gryoReader) {
+ public void offerReader(final GryoReader gryoReader) {
if (this.gryoReaders.size() < MAX_QUEUE_SIZE)
this.gryoReaders.offer(gryoReader);
}
- public void addWriter(final GryoWriter gryoWriter) {
+ public void offerWriter(final GryoWriter gryoWriter) {
if (this.gryoWriters.size() < MAX_QUEUE_SIZE)
this.gryoWriters.offer(gryoWriter);
}
public <A> A doWithReaderWriter(final BiFunction<GryoReader, GryoWriter, A> readerWriterBiFunction) {
- final GryoReader gryoReader = this.getReader();
- final GryoWriter gryoWriter = this.getWriter();
+ final GryoReader gryoReader = this.takeReader();
+ final GryoWriter gryoWriter = this.takeWriter();
final A a = readerWriterBiFunction.apply(gryoReader, gryoWriter);
- this.addReader(gryoReader);
- this.addWriter(gryoWriter);
+ this.offerReader(gryoReader);
+ this.offerWriter(gryoWriter);
return a;
}
public <A> A doWithReader(final Function<GryoReader, A> readerFunction) {
- final GryoReader gryoReader = this.getReader();
+ final GryoReader gryoReader = this.takeReader();
final A a = readerFunction.apply(gryoReader);
- this.addReader(gryoReader);
+ this.offerReader(gryoReader);
return a;
}
public <A> A doWithWriter(final Function<GryoWriter, A> writerFunction) {
- final GryoWriter gryoWriter = this.getWriter();
+ final GryoWriter gryoWriter = this.takeWriter();
final A a = writerFunction.apply(gryoWriter);
- this.addWriter(gryoWriter);
+ this.offerWriter(gryoWriter);
return a;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeTest.java
index 9f403ba..7f46632 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeTest.java
@@ -82,7 +82,7 @@ public abstract class TreeTest extends AbstractGremlinProcessTest {
assertEquals(1, ((Map) tree.get(convertToVertexId("marko"))).size());
assertTrue(((Map) tree.get(convertToVertexId("marko"))).containsKey(convertToVertexId("josh")));
assertTrue(((Map) ((Map) tree.get(convertToVertexId("marko"))).get(convertToVertexId("josh"))).containsKey(convertToVertexId("lop")));
- // TODO: fails randomly assertTrue(((Map) ((Map) tree.get(convertToVertexId("marko"))).get(convertToVertexId("josh"))).containsKey(convertToVertexId("ripple")));
+ assertTrue(((Map) ((Map) tree.get(convertToVertexId("marko"))).get(convertToVertexId("josh"))).containsKey(convertToVertexId("ripple")));
});
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
index 2b4e3f0..f23f4a8 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
@@ -57,18 +57,19 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, VertexWritab
@Override
public void compute(final Iterable<ObjectWritable> messages) {
final GiraphWorkerContext workerContext = (GiraphWorkerContext) this.getWorkerContext();
- final VertexProgram vertexProgram = workerContext.getVertexProgram();
+ final VertexProgram vertexProgram = workerContext.getVertexProgramPool().take();
final GiraphMemory memory = workerContext.getMemory();
final GiraphMessenger messenger = workerContext.getMessenger(this, messages);
final StrategyVertex wrappedVertex = ComputerDataStrategy.wrapVertex(this.getValue().get(), vertexProgram);
///////////
- if (!(Boolean) ((RuleWritable) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject())
+ if (!(Boolean) ((RuleWritable) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject()) {
vertexProgram.execute(wrappedVertex, messenger, memory); // TODO provide a wrapper around TinkerVertex for Edge and non-ComputeKeys manipulation
- else if (workerContext.deriveMemory()) {
+ } else if (workerContext.deriveMemory()) {
final MapMemory mapMemory = new MapMemory();
memory.asMap().forEach(mapMemory::set);
mapMemory.setIteration(memory.getIteration() - 1);
wrappedVertex.property(VertexProperty.Cardinality.single, Constants.MAP_MEMORY, mapMemory); // TODO: this is a "computer key"
}
+ workerContext.getVertexProgramPool().offer(vertexProgram);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index 1320c8f..8275ca7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -88,7 +88,7 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), LongWritable.class, LongWritable.class);
this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
- //this.giraphConfiguration.setBoolean("giraph.isStaticGraph", true);
+ this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
index c34f9c6..c0bec5d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
@@ -39,7 +39,7 @@ public class GiraphMessenger<M> implements Messenger<M> {
private GiraphComputeVertex giraphComputeVertex;
private Iterable<ObjectWritable<M>> messages;
- public void setCurrentVertex(final GiraphComputeVertex giraphComputeVertex, final Iterable<ObjectWritable<M>> messages) {
+ public GiraphMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterable<ObjectWritable<M>> messages) {
this.giraphComputeVertex = giraphComputeVertex;
this.messages = messages;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
index 55dd82c..a315c5c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
@@ -18,21 +18,22 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.worker.WorkerContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class GiraphWorkerContext extends WorkerContext {
- private VertexProgram<?> vertexProgram;
+ private VertexProgramPool vertexProgramPool;
private GiraphMemory memory;
- private GiraphMessenger messenger;
private boolean deriveMemory;
public GiraphWorkerContext() {
@@ -40,9 +41,10 @@ public final class GiraphWorkerContext extends WorkerContext {
}
public void preApplication() throws InstantiationException, IllegalAccessException {
- this.vertexProgram = VertexProgram.createVertexProgram(ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration()));
- this.memory = new GiraphMemory(this, this.vertexProgram);
- this.messenger = new GiraphMessenger();
+ this.vertexProgramPool = new VertexProgramPool(
+ this.getContext().getConfiguration().getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 1),
+ ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration()));
+ this.memory = new GiraphMemory(this, VertexProgram.createVertexProgram(ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration())));
this.deriveMemory = this.getContext().getConfiguration().getBoolean(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, false);
}
@@ -51,15 +53,15 @@ public final class GiraphWorkerContext extends WorkerContext {
}
public void preSuperstep() {
- this.vertexProgram.workerIterationStart(new ImmutableMemory(this.memory));
+ this.vertexProgramPool.workerIterationStart(new ImmutableMemory(this.memory));
}
public void postSuperstep() {
- this.vertexProgram.workerIterationEnd(new ImmutableMemory(this.memory));
+ this.vertexProgramPool.workerIterationEnd(new ImmutableMemory(this.memory));
}
- public VertexProgram<?> getVertexProgram() {
- return this.vertexProgram;
+ public VertexProgramPool getVertexProgramPool() {
+ return this.vertexProgramPool;
}
public GiraphMemory getMemory() {
@@ -67,8 +69,7 @@ public final class GiraphWorkerContext extends WorkerContext {
}
public GiraphMessenger getMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterable<ObjectWritable> messages) {
- this.messenger.setCurrentVertex(giraphComputeVertex, messages);
- return this.messenger;
+ return new GiraphMessenger(giraphComputeVertex, messages);
}
public boolean deriveMemory() {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e2d3eeea/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 0802501..d127132 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -25,7 +25,6 @@ import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopElement;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
@@ -34,8 +33,6 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONResourceAccess;
@@ -117,10 +114,10 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
put("giraph.zkServerPort", "2181"); // you must have a local zookeeper running on this port
put("giraph.nettyServerUseExecutionHandler", false); // this prevents so many integration tests running out of threads
put("giraph.nettyClientUseExecutionHandler", false); // this prevents so many integration tests running out of threads
- put("giraph.numInputThreads",4);
- put("giraph.numComputeThreads",4);
- put("giraph.vertexOutputFormatThreadSafe",true);
- put("giraph.numOutputThreads",4);
+ put("giraph.numInputThreads", 3);
+ put("giraph.numComputeThreads", 3);
+ put("giraph.vertexOutputFormatThreadSafe", true);
+ put("giraph.numOutputThreads", 3);
/// spark configuration