You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/10 21:37:32 UTC
incubator-tinkerpop git commit: Renamed GiraphComputeVertex to
GiraphVertex as Giraph 1.1.0 makes a distinction between Computation and
Vertex. Fixed the sort bug in Giraph (sorta hacky fix----need to learn more
about Hadoop2 and reduce sorting). Fixed s
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master a384d554e -> d28ef7884
Renamed GiraphComputeVertex to GiraphVertex as Giraph 1.1.0 makes a distinction between Computation and Vertex. Fixed the sort bug in Giraph (sorta hacky fix----need to learn more about Hadoop2 and reduce sorting). Fixed some directory structure issues with Spark and Giraph (again, Hadoop2 HDFS has weird schema model).
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/d28ef788
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/d28ef788
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/d28ef788
Branch: refs/heads/master
Commit: d28ef788417485b47bef20c571d3f1517b06ac0b
Parents: a384d55
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Sep 10 13:37:57 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Sep 10 13:37:57 2015 -0600
----------------------------------------------------------------------
.../groovy/plugin/GiraphGremlinPlugin.java | 5 ---
.../process/computer/GiraphComputation.java | 2 +-
.../process/computer/GiraphComputeVertex.java | 39 --------------------
.../process/computer/GiraphGraphComputer.java | 2 +-
.../process/computer/GiraphMessenger.java | 8 ++--
.../giraph/process/computer/GiraphVertex.java | 39 ++++++++++++++++++++
.../process/computer/GiraphWorkerContext.java | 4 +-
.../process/computer/io/GiraphVertexReader.java | 4 +-
.../process/computer/io/GiraphVertexWriter.java | 4 +-
.../groovy/plugin/HadoopGremlinPluginTest.java | 8 ++--
.../computer/HadoopGiraphGraphProvider.java | 8 ++--
.../process/computer/util/MapReduceHelper.java | 1 +
.../groovy/plugin/HadoopGremlinPluginTest.java | 8 ++--
.../computer/HadoopSparkGraphProvider.java | 4 +-
14 files changed, 66 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/groovy/plugin/GiraphGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/groovy/plugin/GiraphGremlinPlugin.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/groovy/plugin/GiraphGremlinPlugin.java
index 8518871..03a974a 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/groovy/plugin/GiraphGremlinPlugin.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/groovy/plugin/GiraphGremlinPlugin.java
@@ -53,11 +53,6 @@ public final class GiraphGremlinPlugin extends AbstractGremlinPlugin {
@Override
public void afterPluginTo(final PluginAcceptor pluginAcceptor) throws PluginInitializationException, IllegalEnvironmentException {
pluginAcceptor.addImports(IMPORTS);
- /*try {
- pluginAcceptor.eval(String.format("LoggerFactory.getLogger(%s).setLevel(Level.INFO)", GiraphGraphComputer.class.getName()));
- } catch (final Exception e) {
- throw new PluginInitializationException(e.getMessage(), e);
- }*/
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
index bb3c21c..1d52566 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
@@ -41,7 +41,7 @@ public final class GiraphComputation extends BasicComputation<ObjectWritable, Ve
public void compute(final Vertex<ObjectWritable, VertexWritable, NullWritable> vertex, final Iterable<ObjectWritable> messages) throws IOException {
final GiraphWorkerContext workerContext = this.getWorkerContext();
final VertexProgram<?> vertexProgram = workerContext.getVertexProgramPool().take();
- vertexProgram.execute(ComputerGraph.vertexProgram(vertex.getValue().get(), vertexProgram), workerContext.getMessenger((GiraphComputeVertex) vertex, this, messages.iterator()), workerContext.getMemory());
+ vertexProgram.execute(ComputerGraph.vertexProgram(vertex.getValue().get(), vertexProgram), workerContext.getMessenger((GiraphVertex) vertex, this, messages.iterator()), workerContext.getMemory());
workerContext.getVertexProgramPool().offer(vertexProgram);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputeVertex.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputeVertex.java
deleted file mode 100644
index 1f14383..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputeVertex.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer;
-
-import org.apache.giraph.graph.DefaultVertex;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphComputeVertex extends DefaultVertex<ObjectWritable, VertexWritable, NullWritable> {
-
- public GiraphComputeVertex() {
- }
-
- public GiraphComputeVertex(final VertexWritable vertexWritable) {
- final VertexWritable newWritable = new VertexWritable();
- newWritable.set(vertexWritable.get());
- this.initialize(new ObjectWritable<>(newWritable.get().id()), newWritable, EmptyOutEdges.instance());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index ce718b3..48c4ab9 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -70,7 +70,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
final Configuration configuration = hadoopGraph.configuration();
configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
- this.giraphConfiguration.setVertexClass(GiraphComputeVertex.class);
+ this.giraphConfiguration.setVertexClass(GiraphVertex.class);
this.giraphConfiguration.setComputationClass(GiraphComputation.class);
this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java
index 2b9ce4f..4abb3e7 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java
@@ -37,12 +37,12 @@ import java.util.Iterator;
*/
public final class GiraphMessenger<M> implements Messenger<M> {
- private GiraphComputeVertex giraphComputeVertex;
+ private GiraphVertex giraphVertex;
private GiraphComputation giraphComputation;
private Iterator<ObjectWritable<M>> messages;
- public GiraphMessenger(final GiraphComputeVertex giraphComputeVertex, final GiraphComputation giraphComputation, final Iterator<ObjectWritable<M>> messages) {
- this.giraphComputeVertex = giraphComputeVertex;
+ public GiraphMessenger(final GiraphVertex giraphVertex, final GiraphComputation giraphComputation, final Iterator<ObjectWritable<M>> messages) {
+ this.giraphVertex = giraphVertex;
this.giraphComputation = giraphComputation;
this.messages = messages;
}
@@ -56,7 +56,7 @@ public final class GiraphMessenger<M> implements Messenger<M> {
public void sendMessage(final MessageScope messageScope, final M message) {
if (messageScope instanceof MessageScope.Local) {
final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
- final Traversal.Admin<Vertex, Edge> incidentTraversal = GiraphMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.giraphComputeVertex.getValue().get());
+ final Traversal.Admin<Vertex, Edge> incidentTraversal = GiraphMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.giraphVertex.getValue().get());
final Direction direction = GiraphMessenger.getOppositeDirection(incidentTraversal);
incidentTraversal.forEachRemaining(edge ->
this.giraphComputation.sendMessage(
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphVertex.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphVertex.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphVertex.java
new file mode 100644
index 0000000..3a95fae
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphVertex.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.process.computer;
+
+import org.apache.giraph.graph.DefaultVertex;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertex extends DefaultVertex<ObjectWritable, VertexWritable, NullWritable> {
+
+ public GiraphVertex() {
+ }
+
+ public GiraphVertex(final VertexWritable vertexWritable) {
+ final VertexWritable newWritable = new VertexWritable();
+ newWritable.set(vertexWritable.get());
+ this.initialize(new ObjectWritable<>(newWritable.get().id()), newWritable, EmptyOutEdges.instance());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
index 3f0b6fa..86b733c 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
@@ -71,7 +71,7 @@ public final class GiraphWorkerContext extends WorkerContext {
return this.memory;
}
- public GiraphMessenger getMessenger(final GiraphComputeVertex giraphComputeVertex, final GiraphComputation giraphComputation, final Iterator<ObjectWritable> messages) {
- return new GiraphMessenger(giraphComputeVertex, giraphComputation, messages);
+ public GiraphMessenger getMessenger(final GiraphVertex giraphVertex, final GiraphComputation giraphComputation, final Iterator<ObjectWritable> messages) {
+ return new GiraphMessenger(giraphVertex, giraphComputation, messages);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
index 3313694..c8ed797 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphComputeVertex;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import java.io.IOException;
@@ -52,7 +52,7 @@ public final class GiraphVertexReader extends VertexReader {
@Override
public Vertex getCurrentVertex() throws IOException, InterruptedException {
- return new GiraphComputeVertex(this.recordReader.getCurrentValue());
+ return new GiraphVertex(this.recordReader.getCurrentValue());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
index 5c16d6b..d67b736 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphComputeVertex;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import java.io.IOException;
@@ -52,6 +52,6 @@ public final class GiraphVertexWriter extends VertexWriter {
@Override
public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
- this.recordWriter.write(NullWritable.get(), ((GiraphComputeVertex) vertex).getValue());
+ this.recordWriter.write(NullWritable.get(), ((GiraphVertex) vertex).getValue());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/groovy/plugin/HadoopGremlinPluginTest.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/groovy/plugin/HadoopGremlinPluginTest.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/groovy/plugin/HadoopGremlinPluginTest.java
index af24330..37d7145 100644
--- a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/groovy/plugin/HadoopGremlinPluginTest.java
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/groovy/plugin/HadoopGremlinPluginTest.java
@@ -133,9 +133,9 @@ public class HadoopGremlinPluginTest extends AbstractGremlinTest {
this.remote.connect(Arrays.asList("graph", "g"));
Traversal<Vertex, String> traversal = (Traversal<Vertex, String>) this.remote.submit(Arrays.asList("g.V().hasLabel('person').group('m').by('age').by('name').out('knows').out('created').values('name')"));
AbstractGremlinProcessTest.checkResults(Arrays.asList("ripple", "lop"), traversal);
- assertTrue((Boolean) this.console.eval("hdfs.exists('giraph-gremlin/target/test-output/m')"));
- assertTrue((Boolean) this.console.eval("hdfs.exists('giraph-gremlin/target/test-output/" + TraverserMapReduce.TRAVERSERS + "')"));
- final List<KeyValue<Integer, BulkSet<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.head('giraph-gremlin/target/test-output/m',ObjectWritable)"));
+ assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/m')"));
+ assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/" + TraverserMapReduce.TRAVERSERS + "')"));
+ final List<KeyValue<Integer, BulkSet<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output/m',ObjectWritable)"));
assertEquals(4, mList.size());
mList.forEach(keyValue -> {
if (keyValue.getKey().equals(29))
@@ -149,7 +149,7 @@ public class HadoopGremlinPluginTest extends AbstractGremlinTest {
else
throw new IllegalStateException("The provided key/value is unknown: " + keyValue);
});
- final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.head('giraph-gremlin/target/test-output/" + TraverserMapReduce.TRAVERSERS + "',ObjectWritable)"));
+ final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output/" + TraverserMapReduce.TRAVERSERS + "',ObjectWritable)"));
assertEquals(2, traversersList.size());
traversersList.forEach(keyValue -> {
assertEquals(MapReduce.NullObject.instance(), keyValue.getKey());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java
index 1b3bdc4..86007f8 100644
--- a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java
@@ -111,8 +111,9 @@ public final class HadoopGiraphGraphProvider extends AbstractGraphProvider {
put(Graph.GRAPH, HadoopGraph.class.getName());
put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, graphSONInput ? GraphSONInputFormat.class.getCanonicalName() : GryoInputFormat.class.getCanonicalName());
put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
- put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "giraph-gremlin/target/test-output");
+ put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ put("mapreduce.job.reduces", 4);
/// giraph configuration
put(GiraphConstants.MIN_WORKERS, 1);
put(GiraphConstants.MAX_WORKERS, 1);
@@ -123,9 +124,8 @@ public final class HadoopGiraphGraphProvider extends AbstractGraphProvider {
put(GiraphConstants.NUM_INPUT_THREADS.getKey(), 3);
put(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 3);
put(GiraphConstants.MAX_MASTER_SUPERSTEP_WAIT_MSECS.getKey(), TimeUnit.MINUTES.toMillis(60L));
- put("mapreduce.job.reduces", 4);
- put("giraph.vertexOutputFormatThreadSafe", false);
- put("giraph.numOutputThreads", 3);
+ put(GiraphConstants.VERTEX_OUTPUT_FORMAT_THREAD_SAFE.getKey(), false);
+ put(GiraphConstants.NUM_OUTPUT_THREADS.getKey(), 3);
}};
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
index 56af510..e7522c2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
@@ -121,6 +121,7 @@ public final class MapReduceHelper {
reduceSortJob.setOutputValueClass(ObjectWritable.class);
reduceSortJob.setInputFormatClass(SequenceFileInputFormat.class);
reduceSortJob.setOutputFormatClass(SequenceFileOutputFormat.class);
+ reduceSortJob.setNumReduceTasks(1); // todo: is this necessary to ensure sorted order?
FileInputFormat.setInputPaths(reduceSortJob, memoryPath);
final Path sortedMemoryPath = new Path(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + mapReduce.getMemoryKey());
FileOutputFormat.setOutputPath(reduceSortJob, sortedMemoryPath);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/HadoopGremlinPluginTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/HadoopGremlinPluginTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/HadoopGremlinPluginTest.java
index 84e42d8..77d568c 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/HadoopGremlinPluginTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/HadoopGremlinPluginTest.java
@@ -133,9 +133,9 @@ public class HadoopGremlinPluginTest extends AbstractGremlinTest {
this.remote.connect(Arrays.asList("graph", "g"));
Traversal<Vertex, String> traversal = (Traversal<Vertex, String>) this.remote.submit(Arrays.asList("g.V().hasLabel('person').group('m').by('age').by('name').out('knows').out('created').values('name')"));
AbstractGremlinProcessTest.checkResults(Arrays.asList("ripple", "lop"), traversal);
- assertTrue((Boolean) this.console.eval("hdfs.exists('spark-gremlin/target/test-output/m')"));
- assertTrue((Boolean) this.console.eval("hdfs.exists('spark-gremlin/target/test-output/" + TraverserMapReduce.TRAVERSERS + "')"));
- final List<KeyValue<Integer, BulkSet<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.head('spark-gremlin/target/test-output/m',ObjectWritable)"));
+ assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/m')"));
+ assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/" + TraverserMapReduce.TRAVERSERS + "')"));
+ final List<KeyValue<Integer, BulkSet<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output/m',ObjectWritable)"));
assertEquals(4, mList.size());
mList.forEach(keyValue -> {
if (keyValue.getKey().equals(29))
@@ -149,7 +149,7 @@ public class HadoopGremlinPluginTest extends AbstractGremlinTest {
else
throw new IllegalStateException("The provided key/value is unknown: " + keyValue);
});
- final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.head('spark-gremlin/target/test-output/" + TraverserMapReduce.TRAVERSERS + "',ObjectWritable)"));
+ final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output/" + TraverserMapReduce.TRAVERSERS + "',ObjectWritable)"));
assertEquals(2, traversersList.size());
traversersList.forEach(keyValue -> {
assertEquals(MapReduce.NullObject.instance(), keyValue.getKey());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d28ef788/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
index 22aa3da..c7642c8 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
@@ -109,9 +109,9 @@ public final class HadoopSparkGraphProvider extends AbstractGraphProvider {
put(Graph.GRAPH, HadoopGraph.class.getName());
put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, graphSONInput ? GraphSONInputFormat.class.getCanonicalName() : GryoInputFormat.class.getCanonicalName());
put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
- put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "spark-gremlin/target/test-output");
+ put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- put("mapred.reduce.tasks", 4);
+ put("mapreduce.job.reduces", 4);
/// spark configuration
put("spark.master", "local[4]");
put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");