You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/04 00:30:46 UTC

incubator-tinkerpop git commit: GraphFilter is now embedded at the StarGraph dersialization level. Thus, if an edge isn't needed, its not even manifested as an object. This is in a TEST branch to determine performance gains (if any) on the Blade cluster.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-962-TEST [created] 79bbee911


GraphFilter is now embedded at the StarGraph dersialization level. Thus, if an edge isn't needed, its not even manifested as an object. This is in a TEST branch to determine performance gains (if any) on the Blade cluster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/79bbee91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/79bbee91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/79bbee91

Branch: refs/heads/TINKERPOP-962-TEST
Commit: 79bbee9119b464d91508d4ec4d2281806693fa2c
Parents: 001a13d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Feb 3 16:30:49 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Feb 3 16:30:49 2016 -0700

----------------------------------------------------------------------
 .../gremlin/process/computer/GraphFilter.java   | 13 ++++++++++++
 .../gremlin/structure/io/gryo/GryoReader.java   | 10 +++++++++
 .../util/star/StarGraphGryoSerializer.java      | 22 +++++++++++++++-----
 .../structure/io/CommonFileInputFormat.java     |  2 +-
 .../structure/io/gryo/GryoInputFormat.java      |  2 +-
 .../structure/io/gryo/GryoRecordReader.java     |  9 +++++++-
 .../structure/io/script/ScriptInputFormat.java  |  5 +----
 7 files changed, 51 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79bbee91/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
index e6083a1..cf45f69 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
@@ -102,6 +102,19 @@ public final class GraphFilter implements Cloneable, Serializable {
         return this.vertexFilter != null;
     }
 
+    public boolean maybeLegalEdge(final Direction direction, final String label) {
+        if (null == this.edgeFilter)
+            return true;
+        else if (this.allowNoEdges)
+            return false;
+        else if (!direction.equals(Direction.BOTH) && !this.allowedEdgeDirection.equals(direction))
+            return false;
+        else if (!this.allowedEdgeLabels.isEmpty() && !this.allowedEdgeLabels.contains(label))
+            return false;
+        else
+            return true;
+    }
+
     @Override
     public GraphFilter clone() {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79bbee91/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
index d457ffb..b47d210 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphGryoSerializer;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
@@ -140,6 +141,15 @@ public final class GryoReader implements GraphReader {
         return readVertex(inputStream, vertexAttachMethod, null, null);
     }
 
+    public Vertex readVertex(final InputStream inputStream, final StarGraphGryoSerializer serializer) throws IOException {
+        final Input input = new Input(inputStream);
+        readHeader(input);
+        final StarGraph starGraph = kryo.readObject(input, StarGraph.class, serializer);
+        // read the terminator
+        kryo.readClassAndObject(input);
+        return starGraph.getStarVertex();
+    }
+
     /**
      * Read a {@link Vertex} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79bbee91/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
index 5bcfe80..d3b80d7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.structure.util.star;
 
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.T;
@@ -46,6 +47,7 @@ public final class StarGraphGryoSerializer extends Serializer<StarGraph> {
     private static final Map<Direction, StarGraphGryoSerializer> CACHE = new HashMap<>();
 
     private final Direction edgeDirectionToSerialize;
+    private GraphFilter graphFilter = null;
 
     private final static byte VERSION_1 = Byte.MIN_VALUE;
 
@@ -68,6 +70,12 @@ public final class StarGraphGryoSerializer extends Serializer<StarGraph> {
         return CACHE.get(direction);
     }
 
+    public static StarGraphGryoSerializer withGraphFilter(final GraphFilter graphFilter) {
+        final StarGraphGryoSerializer serializer = new StarGraphGryoSerializer(Direction.BOTH);
+        serializer.graphFilter = graphFilter.clone();
+        return serializer;
+    }
+
     @Override
     public void write(final Kryo kryo, final Output output, final StarGraph starGraph) {
         output.writeByte(VERSION_1);
@@ -135,7 +143,7 @@ public final class StarGraphGryoSerializer extends Serializer<StarGraph> {
         }
     }
 
-    private static void readEdges(final Kryo kryo, final Input input, final StarGraph starGraph, final Direction direction) {
+    private void readEdges(final Kryo kryo, final Input input, final StarGraph starGraph, final Direction direction) {
         if (kryo.readObject(input, Boolean.class)) {
             final int numberOfUniqueLabels = kryo.readObject(input, Integer.class);
             for (int i = 0; i < numberOfUniqueLabels; i++) {
@@ -144,10 +152,14 @@ public final class StarGraphGryoSerializer extends Serializer<StarGraph> {
                 for (int j = 0; j < numberOfEdgesWithLabel; j++) {
                     final Object edgeId = kryo.readClassAndObject(input);
                     final Object adjacentVertexId = kryo.readClassAndObject(input);
-                    if (direction.equals(Direction.OUT))
-                        starGraph.starVertex.addOutEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId);
-                    else
-                        starGraph.starVertex.addInEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId);
+                    if (null == this.graphFilter || this.graphFilter.maybeLegalEdge(direction, edgeLabel)) {
+                        if (direction.equals(Direction.OUT))
+                            starGraph.starVertex.addOutEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId);
+                        else
+                            starGraph.starVertex.addInEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId);
+                    } else {
+                        starGraph.edgeProperties.remove(edgeId);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79bbee91/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
index 9f7d458..b83344f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
@@ -38,7 +38,7 @@ public abstract class CommonFileInputFormat extends FileInputFormat<NullWritable
     protected GraphFilter graphFilter = new GraphFilter();
     private boolean graphFilterLoaded = false;
 
-    protected void loadVertexAndEdgeFilters(final TaskAttemptContext context) {
+    protected void loadGraphFilter(final TaskAttemptContext context) {
         if (!this.graphFilterLoaded) {
             if (context.getConfiguration().get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
                 this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(context.getConfiguration()), Constants.GREMLIN_HADOOP_GRAPH_FILTER);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79bbee91/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
index 78ece7c..99d8245 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
@@ -34,7 +34,7 @@ public final class GryoInputFormat extends CommonFileInputFormat {
 
     @Override
     public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
-        super.loadVertexAndEdgeFilters(context);
+        super.loadGraphFilter(context);
         final RecordReader<NullWritable, VertexWritable> reader = new GryoRecordReader(this.graphFilter);
         reader.initialize(split, context);
         return reader;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79bbee91/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
index e489a15..1c88b7b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.VertexTerminator;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphGryoSerializer;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -53,6 +54,7 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
     private static final byte[] TERMINATOR = VertexTerminator.instance().terminal;
 
     private GryoReader gryoReader;
+    private StarGraphGryoSerializer starGraphGryoSerializer = null;
     private final VertexWritable vertexWritable = new VertexWritable();
 
     private long currentLength = 0;
@@ -61,6 +63,9 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
 
     public GryoRecordReader(final GraphFilter graphFilter) {
         this.graphFilter = graphFilter.clone();
+        if (this.graphFilter.hasFilter()) {
+            this.starGraphGryoSerializer = StarGraphGryoSerializer.withGraphFilter(this.graphFilter);
+        }
     }
 
     @Override
@@ -128,7 +133,9 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
             terminatorLocation = ((byte) currentByte) == TERMINATOR[terminatorLocation] ? terminatorLocation + 1 : 0;
             if (terminatorLocation >= TERMINATOR.length) {
                 try (InputStream in = new ByteArrayInputStream(output.toByteArray())) {
-                    final Vertex vertex = this.graphFilter.applyGraphFilter((StarGraph.StarVertex) this.gryoReader.readVertex(in, Attachable::get)); // I know how GryoReader works, so I'm cheating here
+                    final Vertex vertex = this.graphFilter.applyGraphFilter(null == this.starGraphGryoSerializer ?
+                            (StarGraph.StarVertex) this.gryoReader.readVertex(in, Attachable::get) :    // I know how GryoReader works, so I'm cheating here
+                            (StarGraph.StarVertex) this.gryoReader.readVertex(in, this.starGraphGryoSerializer));
                     if (null != vertex) {
                         this.vertexWritable.set(vertex);
                         return true;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79bbee91/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
index 8b9af01..efec4e7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
@@ -18,11 +18,8 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.structure.io.script;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
@@ -38,7 +35,7 @@ public final class ScriptInputFormat extends CommonFileInputFormat {
 
     @Override
     public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
-        super.loadVertexAndEdgeFilters(context);
+        super.loadGraphFilter(context);
         RecordReader<NullWritable, VertexWritable> reader = new ScriptRecordReader(this.graphFilter);
         reader.initialize(split, context);
         return reader;