You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/12 19:59:46 UTC
incubator-tinkerpop git commit: Optimizations to
VertexStreamIterator. And not using static XXXReader/Writers with Hadoop
InputFormats.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master c226ea285 -> 53edaae12
Optimizations to VertexStreamIterator. And not using static XXXReader/Writers with Hadoop InputFormats.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/53edaae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/53edaae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/53edaae1
Branch: refs/heads/master
Commit: 53edaae1231b743870227bb77ce729578184a38b
Parents: c226ea2
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Mar 12 12:59:32 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Mar 12 12:59:41 2015 -0600
----------------------------------------------------------------------
.../traversal/step/sideEffect/ProfileTest.java | 4 +-
.../computer/giraph/GiraphComputeVertex.java | 18 ++++--
.../hadoop/structure/io/VertexWritable.java | 4 +-
.../io/graphson/GraphSONRecordReader.java | 2 +-
.../io/graphson/GraphSONRecordWriter.java | 2 +-
.../structure/io/gryo/VertexStreamIterator.java | 63 ++++++++------------
6 files changed, 45 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileTest.java
index 4ed5954..a5bc114 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileTest.java
@@ -140,14 +140,14 @@ public abstract class ProfileTest extends AbstractGremlinProcessTest {
assertTrue("Duration should be at least the length of the sleep: " + metrics.getDuration(TimeUnit.MILLISECONDS),
metrics.getDuration(TimeUnit.MILLISECONDS) >= 60);
assertTrue("Check that duration is within tolerant range: " + metrics.getDuration(TimeUnit.MILLISECONDS),
- metrics.getDuration(TimeUnit.MILLISECONDS) < 80);
+ metrics.getDuration(TimeUnit.MILLISECONDS) < 100);
// 6 elements w/ a 5ms sleep each = 30ms plus 20ms for other computation
metrics = traversalMetrics.getMetrics(2);
assertTrue("Duration should be at least the length of the sleep: " + metrics.getDuration(TimeUnit.MILLISECONDS),
metrics.getDuration(TimeUnit.MILLISECONDS) >= 30);
assertTrue("Check that duration is within tolerant range: " + metrics.getDuration(TimeUnit.MILLISECONDS),
- metrics.getDuration(TimeUnit.MILLISECONDS) < 50);
+ metrics.getDuration(TimeUnit.MILLISECONDS) < 100);
double totalPercentDuration = 0;
for (Metrics m : traversalMetrics.getMetrics()) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
index f2407fe..670e239 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
@@ -57,14 +57,14 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
private static final String VERTEX_ID = Graph.Hidden.hide("giraph.gremlin.vertexId");
private TinkerVertex tinkerVertex;
private StrategyVertex wrappedVertex;
- private static GryoWriter KRYO_WRITER = GryoWriter.build().create();
- private static GryoReader KRYO_READER = GryoReader.build().create();
+ private GryoWriter KRYO_WRITER;
+ private GryoReader KRYO_READER;
public GiraphComputeVertex() {
}
public GiraphComputeVertex(final org.apache.tinkerpop.gremlin.structure.Vertex vertex) {
- this.tinkerVertex = GiraphComputeVertex.generateTinkerVertexForm(vertex);
+ this.tinkerVertex = this.generateTinkerVertexForm(vertex);
this.tinkerVertex.graph().variables().set(VERTEX_ID, this.tinkerVertex.id());
this.initialize(new LongWritable(Long.valueOf(this.tinkerVertex.id().toString())), this.deflateTinkerVertex(), EmptyOutEdges.instance());
}
@@ -95,8 +95,16 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
///////////////////////////////////////////////
+ private void checkReaderWriters() {
+ if (null == KRYO_READER)
+ KRYO_READER = GryoReader.build().create();
+ if (null == KRYO_WRITER)
+ KRYO_WRITER = GryoWriter.build().create();
+ }
+
private Text deflateTinkerVertex() {
try {
+ checkReaderWriters();
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
KRYO_WRITER.writeGraph(bos, this.tinkerVertex.graph());
bos.flush();
@@ -109,6 +117,7 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
private void inflateTinkerVertex() {
try {
+ checkReaderWriters();
final ByteArrayInputStream bis = new ByteArrayInputStream(this.getValue().getBytes());
final TinkerGraph tinkerGraph = TinkerGraph.open();
KRYO_READER.readGraph(bis, tinkerGraph);
@@ -119,11 +128,12 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
}
}
- private static final TinkerVertex generateTinkerVertexForm(final org.apache.tinkerpop.gremlin.structure.Vertex otherVertex) {
+ private final TinkerVertex generateTinkerVertexForm(final org.apache.tinkerpop.gremlin.structure.Vertex otherVertex) {
if (otherVertex instanceof TinkerVertex)
return (TinkerVertex) otherVertex;
else {
try {
+ checkReaderWriters();
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
KRYO_WRITER.writeVertex(bos, otherVertex, Direction.BOTH);
bos.flush();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
index a42060c..bbf942d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
@@ -42,8 +42,8 @@ import java.io.IOException;
public final class VertexWritable<V extends Vertex> implements Writable {
private Vertex vertex;
- private static final GryoWriter KRYO_WRITER = GryoWriter.build().create();
- private static final GryoReader KRYO_READER = GryoReader.build().create();
+ private final GryoWriter KRYO_WRITER = GryoWriter.build().create();
+ private final GryoReader KRYO_READER = GryoReader.build().create();
public VertexWritable(final Vertex vertex) {
this.vertex = vertex;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
index 95f5b46..ad082bf 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
@@ -43,7 +43,7 @@ import java.util.function.Function;
*/
public class GraphSONRecordReader extends RecordReader<NullWritable, VertexWritable> {
- private static final GraphSONReader GRAPHSON_READER = GraphSONReader.build().create();
+ private final GraphSONReader GRAPHSON_READER = GraphSONReader.build().create();
private final VertexWritable vertex = new VertexWritable(null);
private final LineRecordReader lineRecordReader;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java
index 57ae456..d8f50b7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java
@@ -36,7 +36,7 @@ public class GraphSONRecordWriter extends RecordWriter<NullWritable, VertexWrita
private static final String UTF8 = "UTF-8";
private static final byte[] NEWLINE;
private final DataOutputStream out;
- private static final GraphSONWriter GRAPHSON_WRITER = GraphSONWriter.build().create();
+ private final GraphSONWriter GRAPHSON_WRITER = GraphSONWriter.build().create();
static {
try {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIterator.java
index b32a944..dcd5a49 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIterator.java
@@ -43,18 +43,14 @@ public class VertexStreamIterator implements Iterator<VertexWritable> {
// this is VertexTerminator's long terminal 4185403236219066774L as an array of positive int's
private static final int[] TERMINATOR = new int[]{58, 21, 138, 17, 112, 155, 153, 150};
- private static int BUFLEN = TERMINATOR.length;
-
private final InputStream inputStream;
- private static final GryoReader GRYO_READER = GryoReader.build().create();
+ private final GryoReader GRYO_READER = GryoReader.build().create();
private final ByteArrayOutputStream output = new ByteArrayOutputStream();
- private final int[] buffer = new int[BUFLEN];
- private int len;
private int currentByte;
+ private long currentTotalLength = 0;
private Vertex currentVertex;
private final long maxLength;
- private long currentLength = 0;
public VertexStreamIterator(final InputStream inputStream, final long maxLength) {
this.inputStream = inputStream;
@@ -62,18 +58,18 @@ public class VertexStreamIterator implements Iterator<VertexWritable> {
}
public float getProgress() {
- if (0 == this.currentLength || 0 == this.maxLength)
+ if (0 == this.currentTotalLength || 0 == this.maxLength)
return 0.0f;
- else if (this.currentLength >= this.maxLength || this.maxLength == Long.MAX_VALUE)
+ else if (this.currentTotalLength >= this.maxLength || this.maxLength == Long.MAX_VALUE)
return 1.0f;
else
- return (float) this.currentLength / (float) this.maxLength;
+ return (float) this.currentTotalLength / (float) this.maxLength;
}
@Override
public boolean hasNext() {
- if (this.currentLength >= this.maxLength) // gone beyond the split boundary
+ if (this.currentTotalLength >= this.maxLength) // gone beyond the split boundary
return false;
if (null != this.currentVertex)
return true;
@@ -101,45 +97,36 @@ public class VertexStreamIterator implements Iterator<VertexWritable> {
return new VertexWritable(this.currentVertex);
} finally {
this.currentVertex = null;
- this.len = 0;
this.output.reset();
}
}
private final Vertex advanceToNextVertex() throws IOException {
+ long currentVertexLength = 0;
+ int terminatorLocation = 0;
while (true) {
this.currentByte = this.inputStream.read();
- this.currentLength++;
if (-1 == this.currentByte) {
- if (this.len > 0) {
+ if (currentVertexLength > 0)
throw new IllegalStateException("Remainder of stream exhausted without matching a vertex");
- } else {
+ else
return null;
- }
}
-
- if (this.len >= BUFLEN)
- this.output.write(this.buffer[this.len % BUFLEN]);
-
- this.buffer[this.len % BUFLEN] = this.currentByte;
- this.len++;
-
- if (this.len > BUFLEN) {
- boolean terminated = true;
- for (int i = 0; i < BUFLEN; i++) {
- if (this.buffer[(this.len + i) % BUFLEN] != TERMINATOR[i]) {
- terminated = false;
- break;
- }
- }
-
- if (terminated) {
- final Graph gLocal = TinkerGraph.open();
- final Function<DetachedVertex, Vertex> vertexMaker = detachedVertex -> DetachedVertex.addTo(gLocal, detachedVertex);
- final Function<DetachedEdge, Edge> edgeMaker = detachedEdge -> DetachedEdge.addTo(gLocal, detachedEdge);
- try (InputStream in = new ByteArrayInputStream(this.output.toByteArray())) {
- return GRYO_READER.readVertex(in, Direction.BOTH, vertexMaker, edgeMaker);
- }
+ this.currentTotalLength++;
+ currentVertexLength++;
+ this.output.write(this.currentByte);
+
+ if (this.currentByte == TERMINATOR[terminatorLocation])
+ terminatorLocation++;
+ else
+ terminatorLocation = 0;
+
+ if (terminatorLocation >= TERMINATOR.length) {
+ final Graph gLocal = TinkerGraph.open();
+ final Function<DetachedVertex, Vertex> vertexMaker = detachedVertex -> DetachedVertex.addTo(gLocal, detachedVertex);
+ final Function<DetachedEdge, Edge> edgeMaker = detachedEdge -> DetachedEdge.addTo(gLocal, detachedEdge);
+ try (InputStream in = new ByteArrayInputStream(this.output.toByteArray())) {
+ return GRYO_READER.readVertex(in, Direction.BOTH, vertexMaker, edgeMaker);
}
}
}