You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/19 17:40:27 UTC
[21/50] [abbrv] tinkerpop git commit: We now have
PartitionInputFormat which allows an Hadoop-based GraphComputer to pull data
from any Graph implementation. PartitionInputSplit is basically a wrapper
around Partition so that data access is data-local an
We now have PartitionInputFormat which allows an Hadoop-based GraphComputer to pull data from any Graph implementation. PartitionInputSplit is basically a wrapper around Partition so that data access is data-local and thus, for distributed Graph databases, it will simply Partition.vertices() and turn them into VertxWritables. I have a test case that have SparkGraphComputer working over TinkerGraph. Pretty neato. Still lots more cleaning and work to do, but this is a good breather point.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/28097034
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/28097034
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/28097034
Branch: refs/heads/TINKERPOP-1564
Commit: 280970349473e40626e4a7502a0a8824bf944322
Parents: 825f515
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Dec 16 09:03:03 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 19 10:26:57 2017 -0700
----------------------------------------------------------------------
CHANGELOG.asciidoc | 3 +-
.../gremlin/akka/process/actor/MasterActor.java | 2 +-
.../gremlin/akka/process/actor/WorkerActor.java | 2 +-
.../traversal/step/map/VertexProgramStep.java | 8 +-
.../computer/util/GraphComputerHelper.java | 23 +++
.../tinkerpop/gremlin/structure/Partition.java | 2 +-
.../gremlin/structure/Partitioner.java | 8 +
.../gremlin/structure/util/StringFactory.java | 2 +-
.../util/partitioner/GlobalPartitioner.java | 12 +-
.../util/partitioner/HashPartitioner.java | 12 +-
.../io/partitioner/PartitionerInputFormat.java | 61 ++++++++
.../io/partitioner/PartitionerInputSplit.java | 79 ++++++++++
.../io/partitioner/PartitionerRecordReader.java | 72 +++++++++
.../process/computer/SparkGraphComputer.java | 6 +-
...parkGraphPartitionerComputerProcessTest.java | 33 ++++
.../TinkerGraphPartitionerProvider.java | 155 +++++++++++++++++++
.../process/computer/TinkerGraphComputer.java | 14 +-
17 files changed, 467 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 510dc83..6d63e44 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -37,7 +37,8 @@ TinkerPop 3.3.0 (Release Date: NOT OFFICIALLY RELEASED YET)
* Added `ProcessTraversalStrategy` which is used to get cached strategies associated with a `Processor`.
* Deprecated `Computer` in favor of `GraphComputer.open()`.
* Deprecated `Graph.compute()` and `GraphComputer.submit()` in favor of `GraphComputer.submit(Graph)`.
->>>>>>> So much. withProcessor(Processor). No more Compute. Process.submit(Graph) as we are now staging it so that every Processor (GraphComputer/GraphAgents) can work over any Graph. This will all be via Partitioner. withComputer() deprecated. Lots of cool stuff with Process strategies -- ProcessorTraveralStrategy like VertexProgramStratgegy and ActorProgramStrategy work directly with TraversalStrategies.GlobalCache. So much other stuf... I forget. Check the CHANGELOG. This was a massive undertaking but, thank god, its all backwards compatible (though with deprecation).
+* Added `PartitionerInputFormat` which allows `SparkGraphComputer` and `GiraphGraphComputer` to pull data from any `Graph` implementation.
+* Added `PartitionerInputRDD` which allows `SparkGraphComputer` to pull data from any `Graph` implementation.
* Updated Docker build scripts to include Python dependencies (NOTE: users should remove any previously generated TinkerPop Docker images).
* Added "attachment requisite" `VertexProperty.element()` and `Property.element()` data in GraphSON serialization.
* Added `Vertex`, `Edge`, `VertexProperty`, and `Property` serializers to Gremlin-Python and exposed tests that use graph object arguments.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
index a4ef639..aa31c28 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
@@ -61,7 +61,7 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
this.workers = new ArrayList<>();
final List<Partition> partitions = partitioner.getPartitions();
for (final Partition partition : partitions) {
- final String workerPathString = "worker-" + partition.guid();
+ final String workerPathString = "worker-" + partition.id();
this.workers.add(new Address.Worker(workerPathString, partition.location()));
context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner), workerPathString);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
index 35b5a4f..27f942a 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
@@ -106,7 +106,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
}
private String createWorkerAddress(final Partition partition) {
- return "../worker-" + partition.guid();
+ return "../worker-" + partition.id();
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
index d005940..b02a725 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
@@ -64,7 +64,9 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com
if (this.first && this.getPreviousStep() instanceof EmptyStep) {
this.first = false;
final Graph graph = this.getTraversal().getGraph().get();
- future = this.getComputer().apply(graph).program(this.generateProgram(graph, EmptyMemory.instance())).submit();
+ future = (this.getComputer().getGraphComputerClass().equals(GraphComputer.class)) ?
+ this.getComputer().apply(graph).program(this.generateProgram(graph, EmptyMemory.instance())).submit() :
+ GraphComputer.open(this.getComputer().configuration()).program(this.generateProgram(graph, EmptyMemory.instance())).submit(graph);
final ComputerResult result = future.get();
this.processMemorySideEffects(result.memory());
return this.getTraversal().getTraverserGenerator().generate(result, this, 1l);
@@ -72,7 +74,9 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com
final Traverser.Admin<ComputerResult> traverser = this.starts.next();
final Graph graph = traverser.get().graph();
final Memory memory = traverser.get().memory();
- future = this.generateComputer(graph).program(this.generateProgram(graph, memory)).submit();
+ future = (this.getComputer().getGraphComputerClass().equals(GraphComputer.class)) ?
+ this.getComputer().apply(graph).program(this.generateProgram(graph, memory)).submit() :
+ GraphComputer.open(this.getComputer().configuration()).program(this.generateProgram(graph, memory)).submit(graph);
final ComputerResult result = future.get();
this.processMemorySideEffects(result.memory());
return traverser.split(result, this);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
index dce1934..840c5da 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
@@ -18,13 +18,16 @@
*/
package org.apache.tinkerpop.gremlin.process.computer.util;
+import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.structure.Graph;
import java.lang.reflect.Method;
+import java.util.Iterator;
import java.util.Optional;
/**
@@ -68,6 +71,26 @@ public final class GraphComputerHelper {
return persist.isPresent() ? persist.get() : vertexProgram.isPresent() ? vertexProgram.get().getPreferredPersist() : GraphComputer.Persist.NOTHING;
}
+ public static GraphComputer configure(GraphComputer computer, final Configuration configuration) {
+ final Iterator<String> keys = configuration.getKeys();
+ while (keys.hasNext()) {
+ final String key = keys.next();
+ if (key.equals(GraphComputer.WORKERS))
+ computer = computer.workers(configuration.getInt(GraphComputer.WORKERS));
+ else if (key.equals(GraphComputer.RESULT))
+ computer = computer.result(GraphComputer.ResultGraph.valueOf(configuration.getString(GraphComputer.RESULT)));
+ else if (key.equals(GraphComputer.PERSIST))
+ computer = computer.persist(GraphComputer.Persist.valueOf(configuration.getString(GraphComputer.PERSIST)));
+ else if (key.equals(GraphComputer.VERTICES))
+ computer = computer.vertices((Traversal.Admin) configuration.getProperty(GraphComputer.VERTICES));
+ else if (key.equals(GraphComputer.EDGES))
+ computer = computer.edges((Traversal.Admin) configuration.getProperty(GraphComputer.EDGES));
+ else if (!key.equals(GraphComputer.GRAPH_COMPUTER))
+ computer = computer.configure(key, configuration.getProperty(key));
+ }
+ return computer;
+ }
+
public static boolean areEqual(final MapReduce a, final Object b) {
if (null == a)
throw Graph.Exceptions.argumentCanNotBeNull("a");
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
index dbb5260..f20b9fb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
@@ -65,7 +65,7 @@ public interface Partition {
*
* @return the unique id of the partition
*/
- public UUID guid();
+ public String id();
/**
* Get the {@link InetAddress} of the locations physical location.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
index 1d4aae1..2e2cdb7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
@@ -30,4 +30,12 @@ public interface Partitioner {
public Partition getPartition(final Element element);
+ public default Partition getPartition(final String id) {
+ for(final Partition partition : this.getPartitions()) {
+ if(partition.id().equals(id))
+ return partition;
+ }
+ throw new IllegalArgumentException("The provided partition does not exist in the partitioner");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
index 61d9551..f7c350d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
@@ -150,7 +150,7 @@ public final class StringFactory {
}
public static String partitionString(final Partition partition) {
- return "partition" + L_BRACKET + partition.location().getHostAddress() + COLON + partition.guid() + R_BRACKET;
+ return "partition" + L_BRACKET + partition.location().getHostAddress() + COLON + partition.id() + R_BRACKET;
}
public static String traversalSourceString(final TraversalSource traversalSource) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
index 361750b..1d72a2d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
@@ -32,7 +32,6 @@ import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.UUID;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -63,11 +62,12 @@ public final class GlobalPartitioner implements Partitioner {
private class GlobalPartition implements Partition {
private final Graph graph;
- private final UUID guid = UUID.randomUUID();
+ private final String id;
private final InetAddress location;
private GlobalPartition(final Graph graph) {
this.graph = graph;
+ this.id = this.graph.toString();
try {
this.location = InetAddress.getLocalHost();
} catch (final UnknownHostException e) {
@@ -97,17 +97,17 @@ public final class GlobalPartitioner implements Partitioner {
@Override
public boolean equals(final Object other) {
- return other instanceof Partition && ((Partition) other).guid().equals(this.guid);
+ return other instanceof Partition && ((Partition) other).id().equals(this.id);
}
@Override
public int hashCode() {
- return this.guid.hashCode() + this.location.hashCode();
+ return this.id.hashCode() + this.location.hashCode();
}
@Override
- public UUID guid() {
- return this.guid;
+ public String id() {
+ return this.id;
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java
index b3d3db7..15b4563 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java
@@ -31,7 +31,6 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.UUID;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -72,12 +71,13 @@ public final class HashPartitioner implements Partitioner {
private final Partition basePartition;
private final int totalSplits;
private final int splitId;
- private final UUID guid = UUID.randomUUID();
+ private final String id;
private HashPartition(final Partition basePartition, final int splitId, final int totalSplits) {
this.basePartition = basePartition;
this.totalSplits = totalSplits;
this.splitId = splitId;
+ this.id = this.basePartition.id() + "#" + splitId;
}
@Override
@@ -102,17 +102,17 @@ public final class HashPartitioner implements Partitioner {
@Override
public boolean equals(final Object other) {
- return other instanceof Partition && ((Partition) other).guid().equals(this.guid);
+ return other instanceof Partition && ((Partition) other).id().equals(this.id);
}
@Override
public int hashCode() {
- return this.guid.hashCode() + this.location().hashCode();
+ return this.id.hashCode() + this.location().hashCode();
}
@Override
- public UUID guid() {
- return this.guid;
+ public String id() {
+ return this.id;
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java
new file mode 100644
index 0000000..2dae040
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PartitionerInputFormat extends InputFormat<NullWritable, VertexWritable> {
+ @Override
+ public List<InputSplit> getSplits(final JobContext jobContext) throws IOException, InterruptedException {
+ final Graph graph = GraphFactory.open(ConfUtil.makeApacheConfiguration(jobContext.getConfiguration()));
+ final List<Partition> partitions = graph.partitioner().getPartitions();
+ final List<InputSplit> inputSplits = new ArrayList<>(partitions.size());
+ for (final Partition partition : partitions) {
+ inputSplits.add(new PartitionerInputSplit(partition));
+ }
+ return inputSplits;
+ }
+
+ @Override
+ public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ final PartitionerRecordReader reader = new PartitionerRecordReader();
+ reader.initialize(inputSplit, taskAttemptContext);
+ return reader;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java
new file mode 100644
index 0000000..923f8b3
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PartitionerInputSplit extends InputSplit implements Writable {
+
+ private String location;
+ private String id;
+
+ public PartitionerInputSplit() {
+ // necessary for serialization/writable
+ }
+
+ public PartitionerInputSplit(final Partition partition) {
+ this.location = partition.location().getHostAddress();
+ this.id = partition.id();
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 1;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{this.location};
+ }
+
+ public String getPartitionId() {
+ return this.id;
+ }
+
+ @Override
+ public void write(final DataOutput dataOutput) throws IOException {
+ WritableUtils.writeString(dataOutput, this.location);
+ WritableUtils.writeString(dataOutput, this.id);
+ }
+
+ @Override
+ public void readFields(final DataInput dataInput) throws IOException {
+ this.location = WritableUtils.readString(dataInput);
+ this.id = WritableUtils.readString(dataInput);
+ }
+
+
+ @Override
+ public String toString() {
+ return this.location + "::" + this.id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java
new file mode 100644
index 0000000..c6de8ab
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PartitionerRecordReader extends RecordReader<NullWritable, VertexWritable> {
+
+ private Iterator<Vertex> vertices;
+
+ @Override
+ public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ final Graph graph = GraphFactory.open(ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration()));
+ this.vertices = graph.partitioner().getPartition(((PartitionerInputSplit) inputSplit).getPartitionId()).vertices();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return this.vertices.hasNext();
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public VertexWritable getCurrentValue() throws IOException, InterruptedException {
+ return new VertexWritable(this.vertices.next());
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 61c9663..cbcdfe7 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -53,6 +53,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
@@ -143,8 +144,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
@Override
public Future<ComputerResult> submit(final Graph graph) {
- this.hadoopGraph = (HadoopGraph) graph;
- ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
+ ConfigurationUtils.copy(graph.configuration(), this.sparkConfiguration);
return this.submit();
}
@@ -154,7 +154,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
public static SparkGraphComputer open(final org.apache.commons.configuration.Configuration configuration) {
- return HadoopGraph.open(configuration).compute(SparkGraphComputer.class);
+ return new SparkGraphComputer(HadoopGraph.open(configuration));
}
private Future<ComputerResult> submitWithExecutor(Executor exec) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java
new file mode 100644
index 0000000..7c5b3e3
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.partitioner;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(ProcessComputerSuite.class)
+@GraphProviderClass(provider = TinkerGraphPartitionerProvider.class, graph = TinkerGraph.class)
+public class SparkGraphPartitionerComputerProcessTest {
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java
new file mode 100644
index 0000000..941c942
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.partitioner;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
+import org.apache.tinkerpop.gremlin.AbstractGremlinTest;
+import org.apache.tinkerpop.gremlin.GraphProvider;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner.PartitionerInputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerElement;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraphVariables;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerProperty;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@GraphProvider.Descriptor(computer = SparkGraphComputer.class)
+public class TinkerGraphPartitionerProvider extends AbstractGraphProvider {
+
+ private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList(
+ "testProfileStrategyCallback",
+ "testProfileStrategyCallbackSideEffect",
+ "shouldSucceedWithProperTraverserRequirements",
+ "shouldFailWithImproperTraverserRequirements"));
+
+ private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{
+ add(TinkerEdge.class);
+ add(TinkerElement.class);
+ add(TinkerGraph.class);
+ add(TinkerGraphVariables.class);
+ add(TinkerProperty.class);
+ add(TinkerVertex.class);
+ add(TinkerVertexProperty.class);
+ }};
+
+ @Override
+ public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName,
+ final LoadGraphWith.GraphData loadGraphWith) {
+
+ final TinkerGraph.DefaultIdManager idManager = selectIdMakerFromGraphData(loadGraphWith);
+ final String idMaker = (idManager.equals(TinkerGraph.DefaultIdManager.ANY) ? selectIdMakerFromGraphData(loadGraphWith) : idManager).name();
+ final boolean skipTest = SKIP_TESTS.contains(testMethodName) || SKIP_TESTS.contains(test.getCanonicalName());
+ return new HashMap<String, Object>() {{
+ put(Graph.GRAPH, TinkerGraph.class.getName());
+ put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER, idMaker);
+ put(TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER, idMaker);
+ put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER, idMaker);
+ put("skipTest", skipTest);
+ if (loadGraphWith == LoadGraphWith.GraphData.CREW)
+ put(TinkerGraph.GREMLIN_TINKERGRAPH_DEFAULT_VERTEX_PROPERTY_CARDINALITY, VertexProperty.Cardinality.list.name());
+ put("spark.master", "local[4]");
+ put("spark.serializer", GryoSerializer.class.getCanonicalName());
+ put("spark.kryo.registrationRequired", true);
+ put(Constants.GREMLIN_HADOOP_GRAPH_READER, PartitionerInputFormat.class.getCanonicalName());
+ put(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
+ put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, getWorkingDirectory());
+ put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ put(GraphComputer.GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+ put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ put(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
+ // System.out.println(AbstractGremlinTest.class.getResource(loadGraphWith.location()).toString().replace("file:",""));
+ put(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, AbstractGremlinTest.class.getResource(loadGraphWith.location()).toString().replace("file:", ""));
+ }};
+
+
+ }
+
+ protected void readIntoGraph(final Graph g, final String path) throws IOException {
+
+ }
+
+ @Override
+ public void clear(final Graph graph, final Configuration configuration) throws Exception {
+ //if (graph != null)
+ // graph.close();
+ }
+
+ @Override
+ public Set<Class> getImplementations() {
+ return IMPLEMENTATION;
+ }
+
+ /**
+ * Test that load with specific graph data can be configured with a specific id manager as the data type to
+ * be used in the test for that graph is known.
+ */
+ protected TinkerGraph.DefaultIdManager selectIdMakerFromGraphData(final LoadGraphWith.GraphData loadGraphWith) {
+ if (null == loadGraphWith) return TinkerGraph.DefaultIdManager.ANY;
+ if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC))
+ return TinkerGraph.DefaultIdManager.INTEGER;
+ else if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN))
+ return TinkerGraph.DefaultIdManager.INTEGER;
+ else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW))
+ return TinkerGraph.DefaultIdManager.INTEGER;
+ else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL))
+ return TinkerGraph.DefaultIdManager.INTEGER;
+ else
+ throw new IllegalStateException(String.format("Need to define a new %s for %s", TinkerGraph.IdManager.class.getName(), loadGraphWith.name()));
+ }
+
+ /////////////////////////////
+ /////////////////////////////
+ /////////////////////////////
+
+ @Override
+ public GraphTraversalSource traversal(final Graph graph) {
+ if ((Boolean) graph.configuration().getProperty("skipTest"))
+ return graph.traversal().withComputer();
+ else {
+ return graph.traversal().withProcessor(SparkGraphComputer.open(graph.configuration()));
+ }
+ }
+
+ @Override
+ public GraphComputer getGraphComputer(final Graph graph) {
+ return SparkGraphComputer.open(graph.configuration());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28097034/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 82cb934..cbbe53f 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -69,7 +69,7 @@ public final class TinkerGraphComputer implements GraphComputer {
private Persist persist = null;
private VertexProgram<?> vertexProgram;
- private final TinkerGraph graph;
+ private TinkerGraph graph;
private TinkerMemory memory;
private final TinkerMessageBoard messageBoard = new TinkerMessageBoard();
private boolean executed = false;
@@ -97,6 +97,7 @@ public final class TinkerGraphComputer implements GraphComputer {
this.graph = null;
this.configuration = configuration;
this.configuration.setProperty(GRAPH_COMPUTER, TinkerGraphComputer.class.getCanonicalName());
+ GraphComputerHelper.configure(this, ConfigurationUtils.cloneConfiguration(configuration));
}
public static TinkerGraphComputer open(final Configuration configuration) {
@@ -104,10 +105,7 @@ public final class TinkerGraphComputer implements GraphComputer {
}
public static TinkerGraphComputer open() {
- final BaseConfiguration configuration = new BaseConfiguration();
- configuration.setDelimiterParsingDisabled(true);
- configuration.setProperty(GRAPH_COMPUTER, TinkerGraphComputer.class.getCanonicalName());
- return new TinkerGraphComputer(configuration);
+ return new TinkerGraphComputer(new BaseConfiguration());
}
@Override
@@ -164,6 +162,12 @@ public final class TinkerGraphComputer implements GraphComputer {
}
@Override
+ public Future<ComputerResult> submit(final Graph graph) {
+ this.graph = (TinkerGraph) graph;
+ return this.submit();
+ }
+
+ @Override
public Future<ComputerResult> submit() {
// a graph computer can only be executed once
if (this.executed)