You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/24 21:59:56 UTC
incubator-tinkerpop git commit: SparkVertex is now smart about memory
and uses GryoPool.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 95a33cf7e -> 8f5165834
SparkVertex is now smart about memory and uses GryoPool.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8f516583
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8f516583
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8f516583
Branch: refs/heads/master
Commit: 8f5165834c8e3d253dbb327b175564af0fa3e359
Parents: 95a33cf
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 24 14:59:53 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 24 14:59:53 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/SparkVertex.java | 87 ++++----------------
.../hadoop/structure/io/VertexWritable.java | 13 ++-
.../gremlin/hadoop/HadoopGraphProvider.java | 2 +-
3 files changed, 28 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8f516583/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
index 872fd28..8bd1a0a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
@@ -18,23 +18,16 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Iterator;
@@ -43,122 +36,72 @@ import java.util.Iterator;
*/
public final class SparkVertex implements Vertex, Serializable {
- private static GryoWriter GRYO_WRITER = GryoWriter.build().create();
- private static GryoReader GRYO_READER = GryoReader.build().create();
-
// TODO: Wrapped vertex -- need VertexProgram in partition (broadcast variable?)
- private final Object vertexId;
- private transient TinkerVertex vertex;
- private byte[] vertexBytes;
+ private final VertexWritable vertexWritable;
public SparkVertex(final TinkerVertex vertex) {
- this.vertex = vertex;
- this.vertexId = vertex.id();
+ this.vertexWritable = new VertexWritable(vertex);
}
@Override
public Edge addEdge(final String label, final Vertex inVertex, final Object... keyValues) {
- return this.vertex.addEdge(label, inVertex, keyValues);
+ return this.vertexWritable.get().addEdge(label, inVertex, keyValues);
}
@Override
public Object id() {
- return this.vertexId;
+ return this.vertexWritable.get().id();
}
@Override
public String label() {
- return this.vertex.label();
+ return this.vertexWritable.get().label();
}
@Override
public Graph graph() {
- return this.vertex.graph();
+ return this.vertexWritable.get().graph();
}
@Override
public <V> VertexProperty<V> property(final VertexProperty.Cardinality cardinality, final String key, final V value, final Object... keyValues) {
- return this.vertex.property(cardinality, key, value, keyValues);
+ return this.vertexWritable.get().property(cardinality, key, value, keyValues);
}
@Override
public void remove() {
- this.vertex.remove();
+ this.vertexWritable.get().remove();
}
@Override
public Iterator<Edge> edges(final Direction direction, final String... edgeLabels) {
- return this.vertex.edges(direction, edgeLabels);
+ return this.vertexWritable.get().edges(direction, edgeLabels);
}
@Override
public Iterator<Vertex> vertices(final Direction direction, final String... edgeLabels) {
- return this.vertex.vertices(direction, edgeLabels);
+ return this.vertexWritable.get().vertices(direction, edgeLabels);
}
@Override
public <V> Iterator<VertexProperty<V>> properties(final String... propertyKeys) {
- return this.vertex.properties(propertyKeys);
+ return this.vertexWritable.get().properties(propertyKeys);
}
@Override
public String toString() {
- return StringFactory.vertexString(this);
+ return StringFactory.vertexString(this.vertexWritable.get());
}
@Override
public int hashCode() {
- return this.vertexId.hashCode();
+ return this.vertexWritable.get().hashCode();
}
@Override
public boolean equals(final Object other) {
- return ElementHelper.areEqual(this, other);
- }
-
- ///////////////////////////////
-
- private void writeObject(final ObjectOutputStream outputStream) throws IOException {
- this.deflateVertex();
- outputStream.defaultWriteObject();
- }
-
- private void readObject(final ObjectInputStream inputStream) throws IOException, ClassNotFoundException {
- inputStream.defaultReadObject();
- this.inflateVertex();
- }
-
- public final void inflateVertex() {
- if (null != this.vertex)
- return;
-
- try {
- final ByteArrayInputStream bis = new ByteArrayInputStream(this.vertexBytes);
- final TinkerGraph tinkerGraph = TinkerGraph.open();
- GRYO_READER.readGraph(bis, tinkerGraph);
- bis.close();
- this.vertexBytes = null;
- this.vertex = (TinkerVertex) tinkerGraph.vertices(this.vertexId).next();
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- private final void deflateVertex() {
- if (null != this.vertexBytes)
- return;
-
- try {
- final ByteArrayOutputStream bos = new ByteArrayOutputStream();
- GRYO_WRITER.writeGraph(bos, this.vertex.graph());
- bos.flush();
- bos.close();
- this.vertex = null;
- this.vertexBytes = bos.toByteArray();
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
+ return ElementHelper.areEqual(this.vertexWritable.get(), other);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8f516583/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
index bd8aa07..92d10b7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
@@ -34,11 +34,14 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class VertexWritable implements Writable {
+public final class VertexWritable implements Writable, Serializable {
private Vertex vertex;
@@ -102,6 +105,14 @@ public final class VertexWritable implements Writable {
}
}
+ private void writeObject(final ObjectOutputStream outputStream) throws IOException {
+ this.write(outputStream);
+ }
+
+ private void readObject(final ObjectInputStream inputStream) throws IOException, ClassNotFoundException {
+ this.readFields(inputStream);
+ }
+
@Override
public boolean equals(final Object other) {
return other instanceof VertexWritable && ElementHelper.areEqual(this.vertex, ((VertexWritable) other).get());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8f516583/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 1cf2370..e084d3d 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -121,7 +121,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
/// spark configuration
put("spark.master", "local[4]");
- put("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
+ put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
}};
}