You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/12 19:59:46 UTC

incubator-tinkerpop git commit: Optimizations to VertexStreamIterator. And not using static XXXReader/Writers with Hadoop InputFormats.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master c226ea285 -> 53edaae12


Optimizations to VertexStreamIterator. And not using static XXXReader/Writers with Hadoop InputFormats.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/53edaae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/53edaae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/53edaae1

Branch: refs/heads/master
Commit: 53edaae1231b743870227bb77ce729578184a38b
Parents: c226ea2
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Mar 12 12:59:32 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Mar 12 12:59:41 2015 -0600

----------------------------------------------------------------------
 .../traversal/step/sideEffect/ProfileTest.java  |  4 +-
 .../computer/giraph/GiraphComputeVertex.java    | 18 ++++--
 .../hadoop/structure/io/VertexWritable.java     |  4 +-
 .../io/graphson/GraphSONRecordReader.java       |  2 +-
 .../io/graphson/GraphSONRecordWriter.java       |  2 +-
 .../structure/io/gryo/VertexStreamIterator.java | 63 ++++++++------------
 6 files changed, 45 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileTest.java
index 4ed5954..a5bc114 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileTest.java
@@ -140,14 +140,14 @@ public abstract class ProfileTest extends AbstractGremlinProcessTest {
         assertTrue("Duration should be at least the length of the sleep: " + metrics.getDuration(TimeUnit.MILLISECONDS),
                 metrics.getDuration(TimeUnit.MILLISECONDS) >= 60);
         assertTrue("Check that duration is within tolerant range: " + metrics.getDuration(TimeUnit.MILLISECONDS),
-                metrics.getDuration(TimeUnit.MILLISECONDS) < 80);
+                metrics.getDuration(TimeUnit.MILLISECONDS) < 100);
 
         // 6 elements w/ a 5ms sleep each = 30ms plus 20ms for other computation
         metrics = traversalMetrics.getMetrics(2);
         assertTrue("Duration should be at least the length of the sleep: " + metrics.getDuration(TimeUnit.MILLISECONDS),
                 metrics.getDuration(TimeUnit.MILLISECONDS) >= 30);
         assertTrue("Check that duration is within tolerant range: " + metrics.getDuration(TimeUnit.MILLISECONDS),
-                metrics.getDuration(TimeUnit.MILLISECONDS) < 50);
+                metrics.getDuration(TimeUnit.MILLISECONDS) < 100);
 
         double totalPercentDuration = 0;
         for (Metrics m : traversalMetrics.getMetrics()) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
index f2407fe..670e239 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
@@ -57,14 +57,14 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
     private static final String VERTEX_ID = Graph.Hidden.hide("giraph.gremlin.vertexId");
     private TinkerVertex tinkerVertex;
     private StrategyVertex wrappedVertex;
-    private static GryoWriter KRYO_WRITER = GryoWriter.build().create();
-    private static GryoReader KRYO_READER = GryoReader.build().create();
+    private GryoWriter KRYO_WRITER;
+    private GryoReader KRYO_READER;
 
     public GiraphComputeVertex() {
     }
 
     public GiraphComputeVertex(final org.apache.tinkerpop.gremlin.structure.Vertex vertex) {
-        this.tinkerVertex = GiraphComputeVertex.generateTinkerVertexForm(vertex);
+        this.tinkerVertex = this.generateTinkerVertexForm(vertex);
         this.tinkerVertex.graph().variables().set(VERTEX_ID, this.tinkerVertex.id());
         this.initialize(new LongWritable(Long.valueOf(this.tinkerVertex.id().toString())), this.deflateTinkerVertex(), EmptyOutEdges.instance());
     }
@@ -95,8 +95,16 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
 
     ///////////////////////////////////////////////
 
+    private void checkReaderWriters() {
+        if (null == KRYO_READER)
+            KRYO_READER = GryoReader.build().create();
+        if (null == KRYO_WRITER)
+            KRYO_WRITER = GryoWriter.build().create();
+    }
+
     private Text deflateTinkerVertex() {
         try {
+            checkReaderWriters();
             final ByteArrayOutputStream bos = new ByteArrayOutputStream();
             KRYO_WRITER.writeGraph(bos, this.tinkerVertex.graph());
             bos.flush();
@@ -109,6 +117,7 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
 
     private void inflateTinkerVertex() {
         try {
+            checkReaderWriters();
             final ByteArrayInputStream bis = new ByteArrayInputStream(this.getValue().getBytes());
             final TinkerGraph tinkerGraph = TinkerGraph.open();
             KRYO_READER.readGraph(bis, tinkerGraph);
@@ -119,11 +128,12 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
         }
     }
 
-    private static final TinkerVertex generateTinkerVertexForm(final org.apache.tinkerpop.gremlin.structure.Vertex otherVertex) {
+    private final TinkerVertex generateTinkerVertexForm(final org.apache.tinkerpop.gremlin.structure.Vertex otherVertex) {
         if (otherVertex instanceof TinkerVertex)
             return (TinkerVertex) otherVertex;
         else {
             try {
+                checkReaderWriters();
                 final ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 KRYO_WRITER.writeVertex(bos, otherVertex, Direction.BOTH);
                 bos.flush();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
index a42060c..bbf942d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
@@ -42,8 +42,8 @@ import java.io.IOException;
 public final class VertexWritable<V extends Vertex> implements Writable {
 
     private Vertex vertex;
-    private static final GryoWriter KRYO_WRITER = GryoWriter.build().create();
-    private static final GryoReader KRYO_READER = GryoReader.build().create();
+    private final GryoWriter KRYO_WRITER = GryoWriter.build().create();
+    private final GryoReader KRYO_READER = GryoReader.build().create();
 
     public VertexWritable(final Vertex vertex) {
         this.vertex = vertex;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
index 95f5b46..ad082bf 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
@@ -43,7 +43,7 @@ import java.util.function.Function;
  */
 public class GraphSONRecordReader extends RecordReader<NullWritable, VertexWritable> {
 
-    private static final GraphSONReader GRAPHSON_READER = GraphSONReader.build().create();
+    private final GraphSONReader GRAPHSON_READER = GraphSONReader.build().create();
     private final VertexWritable vertex = new VertexWritable(null);
     private final LineRecordReader lineRecordReader;
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java
index 57ae456..d8f50b7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java
@@ -36,7 +36,7 @@ public class GraphSONRecordWriter extends RecordWriter<NullWritable, VertexWrita
     private static final String UTF8 = "UTF-8";
     private static final byte[] NEWLINE;
     private final DataOutputStream out;
-    private static final GraphSONWriter GRAPHSON_WRITER = GraphSONWriter.build().create();
+    private final GraphSONWriter GRAPHSON_WRITER = GraphSONWriter.build().create();
 
     static {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/53edaae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIterator.java
index b32a944..dcd5a49 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIterator.java
@@ -43,18 +43,14 @@ public class VertexStreamIterator implements Iterator<VertexWritable> {
     // this is VertexTerminator's long terminal 4185403236219066774L as an array of positive int's
     private static final int[] TERMINATOR = new int[]{58, 21, 138, 17, 112, 155, 153, 150};
 
-    private static int BUFLEN = TERMINATOR.length;
-
     private final InputStream inputStream;
-    private static final GryoReader GRYO_READER = GryoReader.build().create();
+    private final GryoReader GRYO_READER = GryoReader.build().create();
     private final ByteArrayOutputStream output = new ByteArrayOutputStream();
-    private final int[] buffer = new int[BUFLEN];
 
-    private int len;
     private int currentByte;
+    private long currentTotalLength = 0;
     private Vertex currentVertex;
     private final long maxLength;
-    private long currentLength = 0;
 
     public VertexStreamIterator(final InputStream inputStream, final long maxLength) {
         this.inputStream = inputStream;
@@ -62,18 +58,18 @@ public class VertexStreamIterator implements Iterator<VertexWritable> {
     }
 
     public float getProgress() {
-        if (0 == this.currentLength || 0 == this.maxLength)
+        if (0 == this.currentTotalLength || 0 == this.maxLength)
             return 0.0f;
-        else if (this.currentLength >= this.maxLength || this.maxLength == Long.MAX_VALUE)
+        else if (this.currentTotalLength >= this.maxLength || this.maxLength == Long.MAX_VALUE)
             return 1.0f;
         else
-            return (float) this.currentLength / (float) this.maxLength;
+            return (float) this.currentTotalLength / (float) this.maxLength;
 
     }
 
     @Override
     public boolean hasNext() {
-        if (this.currentLength >= this.maxLength) // gone beyond the split boundary
+        if (this.currentTotalLength >= this.maxLength) // gone beyond the split boundary
             return false;
         if (null != this.currentVertex)
             return true;
@@ -101,45 +97,36 @@ public class VertexStreamIterator implements Iterator<VertexWritable> {
                 return new VertexWritable(this.currentVertex);
         } finally {
             this.currentVertex = null;
-            this.len = 0;
             this.output.reset();
         }
     }
 
     private final Vertex advanceToNextVertex() throws IOException {
+        long currentVertexLength = 0;
+        int terminatorLocation = 0;
         while (true) {
             this.currentByte = this.inputStream.read();
-            this.currentLength++;
             if (-1 == this.currentByte) {
-                if (this.len > 0) {
+                if (currentVertexLength > 0)
                     throw new IllegalStateException("Remainder of stream exhausted without matching a vertex");
-                } else {
+                else
                     return null;
-                }
             }
-
-            if (this.len >= BUFLEN)
-                this.output.write(this.buffer[this.len % BUFLEN]);
-
-            this.buffer[this.len % BUFLEN] = this.currentByte;
-            this.len++;
-
-            if (this.len > BUFLEN) {
-                boolean terminated = true;
-                for (int i = 0; i < BUFLEN; i++) {
-                    if (this.buffer[(this.len + i) % BUFLEN] != TERMINATOR[i]) {
-                        terminated = false;
-                        break;
-                    }
-                }
-
-                if (terminated) {
-                    final Graph gLocal = TinkerGraph.open();
-                    final Function<DetachedVertex, Vertex> vertexMaker = detachedVertex -> DetachedVertex.addTo(gLocal, detachedVertex);
-                    final Function<DetachedEdge, Edge> edgeMaker = detachedEdge -> DetachedEdge.addTo(gLocal, detachedEdge);
-                    try (InputStream in = new ByteArrayInputStream(this.output.toByteArray())) {
-                        return GRYO_READER.readVertex(in, Direction.BOTH, vertexMaker, edgeMaker);
-                    }
+            this.currentTotalLength++;
+            currentVertexLength++;
+            this.output.write(this.currentByte);
+
+            if (this.currentByte == TERMINATOR[terminatorLocation])
+                terminatorLocation++;
+            else
+                terminatorLocation = 0;
+
+            if (terminatorLocation >= TERMINATOR.length) {
+                final Graph gLocal = TinkerGraph.open();
+                final Function<DetachedVertex, Vertex> vertexMaker = detachedVertex -> DetachedVertex.addTo(gLocal, detachedVertex);
+                final Function<DetachedEdge, Edge> edgeMaker = detachedEdge -> DetachedEdge.addTo(gLocal, detachedEdge);
+                try (InputStream in = new ByteArrayInputStream(this.output.toByteArray())) {
+                    return GRYO_READER.readVertex(in, Direction.BOTH, vertexMaker, edgeMaker);
                 }
             }
         }