You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/06 20:54:33 UTC

[02/30] incubator-tinkerpop git commit: GraphComputerTest now verifies that graph filters work -- GraphComputer.vertices() and GraphComputer.edges(). SparkGraphComputer implements graph filters correctly. TinkerGraph and Giraph throw UnsupportOperationEx

GraphComputerTest now verifies that graph filters work -- GraphComputer.vertices() and GraphComputer.edges(). SparkGraphComputer implements graph filters correctly. TinkerGraph and Giraph throw UnsupportOperationException at this point (i.e. TODO). Had to add remove() methods to many of the inner Iterator anonymous classes in IteratorUtils and MultiIterator. Basically, they just call remove() on the wrapped iterator. Thus, cleanly backwards compatible. Added GraphFilterAware interface will allow InputFormats to say whether or not they do vertex/edge-filtering on graph load. Nothing connected to that yet, but GryoInputFormat (and smart providers) will be able to leverage this interface. Still a work in progress....


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3b3e008c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3b3e008c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3b3e008c

Branch: refs/heads/master
Commit: 3b3e008ce03d1f63610b92ff79886376d9dc55f7
Parents: 873174e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Feb 1 12:38:42 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Feb 1 12:38:42 2016 -0700

----------------------------------------------------------------------
 .../gremlin/process/computer/GraphComputer.java |   2 +-
 .../gremlin/util/iterator/IteratorUtils.java    |  15 ++
 .../gremlin/util/iterator/MultiIterator.java    |   5 +
 .../process/computer/GraphComputerTest.java     | 158 +++++++++++++++++++
 .../computer/AbstractHadoopGraphComputer.java   |   4 +-
 .../hadoop/structure/io/GraphFilterAware.java   |  39 +++++
 .../spark/process/computer/SparkExecutor.java   |  39 ++---
 .../process/computer/SparkGraphComputer.java    |  88 ++++++-----
 .../process/computer/TinkerGraphComputer.java   |  12 ++
 9 files changed, 302 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3e008c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
index 1494c28..1156978 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -109,7 +109,7 @@ public interface GraphComputer {
 
     public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter);
 
-    public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter);
+    public GraphComputer edges(final Traversal<Edge, Edge> edgeFilter);
 
     /**
      * Set an arbitrary configuration key/value for the underlying {@link org.apache.commons.configuration.Configuration} in the {@link GraphComputer}.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3e008c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
index dc38a07..2c25b14 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
@@ -231,6 +231,11 @@ public final class IteratorUtils {
             public E next() {
                 return function.apply(iterator.next());
             }
+
+            @Override
+            public void remove() {
+                iterator.remove();
+            }
         };
     }
 
@@ -257,6 +262,11 @@ public final class IteratorUtils {
             }
 
             @Override
+            public void remove() {
+                iterator.remove();
+            }
+
+            @Override
             public S next() {
                 try {
                     if (null != this.nextResult) {
@@ -312,6 +322,11 @@ public final class IteratorUtils {
             }
 
             @Override
+            public void remove() {
+                this.currentIterator.remove();
+            }
+
+            @Override
             public E next() {
                 if (this.hasNext())
                     return this.currentIterator.next();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3e008c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
index 02c15f4..fca220b 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
@@ -58,6 +58,11 @@ public final class MultiIterator<T> implements Iterator<T>, Serializable {
     }
 
     @Override
+    public void remove() {
+        this.iterators.get(this.current).remove();
+    }
+
+    @Override
     public T next() {
         if (this.iterators.isEmpty()) throw FastNoSuchElementException.instance();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3e008c/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 1af0830..361e402 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -24,11 +24,16 @@ import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
 import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -130,6 +135,16 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
+        public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
+            return null;
+        }
+
+        @Override
+        public GraphComputer edges(final Traversal<Edge, Edge> edgeFilter) {
+            return null;
+        }
+
+        @Override
         public GraphComputer configure(final String key, final Object value) {
             return null;
         }
@@ -1466,4 +1481,147 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
     }
 
+    /////////////////////////////////////////////
+
+    @Test
+    @LoadGraphWith(MODERN)
+    public void shouldSupportVertexAndEdgeFilters() throws Exception {
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.hasLabel("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.hasLabel("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.<Edge>hasLabel("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
+    }
+
+    public static class VertexProgramM implements VertexProgram {
+
+        public static final String SOFTWARE_ONLY = "softwareOnly";
+        public static final String PEOPLE_ONLY = "peopleOnly";
+        public static final String KNOWS_ONLY = "knowsOnly";
+        public static final String PEOPLE_KNOWS_ONLY = "peopleKnowsOnly";
+        public static final String PEOPLE_KNOWS_WELL_ONLY = "peopleKnowsWellOnly";
+
+        private String state;
+
+        public VertexProgramM() {
+
+        }
+
+        public VertexProgramM(final String state) {
+            this.state = state;
+        }
+
+        @Override
+        public void setup(final Memory memory) {
+
+        }
+
+        @Override
+        public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
+            switch (this.state) {
+                case SOFTWARE_ONLY: {
+                    assertEquals("software", vertex.label());
+                    assertFalse(vertex.edges(Direction.OUT).hasNext());
+                    assertTrue(vertex.edges(Direction.IN).hasNext());
+                    assertTrue(vertex.edges(Direction.IN, "created").hasNext());
+                    assertFalse(vertex.edges(Direction.IN, "knows").hasNext());
+                    break;
+                }
+                case PEOPLE_ONLY: {
+                    assertEquals("person", vertex.label());
+                    assertFalse(vertex.edges(Direction.IN, "created").hasNext());
+                    assertTrue(IteratorUtils.count(vertex.edges(Direction.BOTH)) > 0);
+                    break;
+                }
+                case KNOWS_ONLY: {
+                    assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
+                    if (vertex.value("name").equals("marko"))
+                        assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
+                    else if (vertex.value("name").equals("vadas"))
+                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
+                    else if (vertex.value("name").equals("josh"))
+                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
+                    else {
+                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
+                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
+                    }
+                    break;
+                }
+                case PEOPLE_KNOWS_ONLY: {
+                    assertEquals("person", vertex.label());
+                    assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
+                    if (vertex.value("name").equals("marko"))
+                        assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
+                    else if (vertex.value("name").equals("vadas"))
+                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
+                    else if (vertex.value("name").equals("josh"))
+                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
+                    else {
+                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
+                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
+                    }
+                    break;
+                }
+                case PEOPLE_KNOWS_WELL_ONLY: {
+                    assertEquals("person", vertex.label());
+                    assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
+                    if (vertex.value("name").equals("marko")) {
+                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
+                        assertEquals(1.0, vertex.edges(Direction.OUT, "knows").next().value("weight"), 0.001);
+                    } else if (vertex.value("name").equals("vadas"))
+                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
+                    else if (vertex.value("name").equals("josh")) {
+                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
+                        assertEquals(1.0, vertex.edges(Direction.IN, "knows").next().value("weight"), 0.001);
+                    } else {
+                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
+                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
+                    }
+                    break;
+                }
+                default:
+                    throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
+            }
+        }
+
+        @Override
+        public boolean terminate(final Memory memory) {
+            return true;
+        }
+
+        @Override
+        public Set<MessageScope> getMessageScopes(Memory memory) {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public GraphComputer.ResultGraph getPreferredResultGraph() {
+            return GraphComputer.ResultGraph.NEW;
+        }
+
+        @Override
+        public GraphComputer.Persist getPreferredPersist() {
+            return GraphComputer.Persist.NOTHING;
+        }
+
+        @Override
+        @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
+        public VertexProgramM clone() {
+            return new VertexProgramM(this.state);
+        }
+
+        @Override
+        public void loadState(final Graph graph, final Configuration configuration) {
+            this.state = configuration.getString("state");
+        }
+
+        @Override
+        public void storeState(final Configuration configuration) {
+            configuration.setProperty("state", this.state);
+            VertexProgram.super.storeState(configuration);
+        }
+
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3e008c/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index 9d36323..2ed6d9f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -56,7 +56,7 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
     protected Persist persist = null;
 
     protected Traversal.Admin<Vertex, Vertex> vertexFilter = null;
-    protected Traversal.Admin<Vertex, Edge> edgeFilter = null;
+    protected Traversal.Admin<Edge, Edge> edgeFilter = null;
 
     public AbstractHadoopGraphComputer(final HadoopGraph hadoopGraph) {
         this.hadoopGraph = hadoopGraph;
@@ -70,7 +70,7 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
     }
 
     @Override
-    public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter) {
+    public GraphComputer edges(final Traversal<Edge, Edge> edgeFilter) {
         this.edgeFilter = edgeFilter.asAdmin();
         return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3e008c/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
new file mode 100644
index 0000000..a840523
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+/**
+ * An input graph class is {@code GraphFilterAware} if it can filter out vertices and edges as its loading the graph from the
+ * source data. Any input class that is {@code GraphFilterAware} must be able to fully handle both vertex and edge filtering.
+ * It is assumed that if the input class is {@code GraphFilterAware}, then the respective
+ * {@link org.apache.tinkerpop.gremlin.process.computer.GraphComputer} will not perform any filtering on the loaded graph.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface GraphFilterAware {
+
+    public void setVertexFilter(final Traversal<Vertex, Vertex> vertexFilter);
+
+    public void setEdgeFilter(final Traversal<Edge, Edge> edgeFilter);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3e008c/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index ec9fdd2..d034c92 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -29,13 +29,13 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
@@ -46,6 +46,7 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import scala.Tuple2;
 
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -63,23 +64,25 @@ public final class SparkExecutor {
     // DATA LOADING //
     //////////////////
 
-    public static JavaPairRDD<Object, VertexWritable> filterLoadedGraph(JavaPairRDD<Object, VertexWritable> graphRDD, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Vertex, Edge> edgeFilter) {
-        if (null != vertexFilter) {
-            graphRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
-                final Traversal.Admin<Vertex, Vertex> vFilter = vertexFilter.clone();
-                return () -> IteratorUtils.filter(partitionIterator, tuple -> TraversalUtil.test(tuple._2().get(), vFilter));
-            }, true);
-        }
-        if (null != edgeFilter) {
-            graphRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
-                final Traversal.Admin<Vertex, Edge> eFilter = (Traversal.Admin) __.not(edgeFilter.clone()).drop().asAdmin();
-                return () -> IteratorUtils.map(partitionIterator, tuple -> {
-                    IteratorUtils.iterate(TraversalUtil.applyAll(tuple._2().get(), eFilter));
-                    return tuple;
-                });
-            }, true);
-        }
-        return graphRDD;
+    public static JavaPairRDD<Object, VertexWritable> filterLoadedGraph(JavaPairRDD<Object, VertexWritable> graphRDD, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
+        return graphRDD.mapPartitionsToPair(partitionIterator -> {
+            final Traversal.Admin<Vertex, Vertex> vFilter = (null == vertexFilter) ? null : vertexFilter.clone();
+            final Traversal.Admin<Edge, Edge> eFilter = (null == edgeFilter) ? null : edgeFilter.clone();
+            return () -> IteratorUtils.filter(partitionIterator, tuple -> {
+                if (null != vFilter && !TraversalUtil.test(tuple._2().get(), vFilter))
+                    return false;
+                else {
+                    if (null != eFilter) {
+                        final Iterator<Edge> edgeIterator = tuple._2().get().edges(Direction.BOTH);
+                        while (edgeIterator.hasNext()) {
+                            if (!TraversalUtil.test(edgeIterator.next(), eFilter))
+                                edgeIterator.remove();
+                        }
+                    }
+                    return true;
+                }
+            });
+        }, true);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3e008c/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 3104ba2..75e7d6c 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.spark.HashPartitioner;
+import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -40,6 +41,7 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmiss
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
@@ -131,7 +133,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             }
         }
 
-        // create the completable future
+        // create the completable future                                                    
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
             final long startTime = System.currentTimeMillis();
             final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
@@ -140,6 +142,21 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, Object.class));
             final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, Object.class));
             final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, Object.class));
+            final InputRDD inputRDD;
+            final OutputRDD outputRDD;
+            try {
+                inputRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class).newInstance();
+                outputRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class).newInstance();
+                if (inputRDD instanceof GraphFilterAware) { // if the input class can filter on load, then set the filters
+                    if (null != this.vertexFilter)
+                        ((GraphFilterAware) inputRDD).setVertexFilter(this.vertexFilter);
+                    if (null != edgeFilter)
+                        ((GraphFilterAware) inputRDD).setEdgeFilter(this.edgeFilter);
+                }
+            } catch (final InstantiationException | IllegalAccessException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+            final boolean filtered = (this.vertexFilter != null || this.edgeFilter != null) && !(inputRDD instanceof GraphFilterAware);
             SparkMemory memory = null;
             // delete output location
             final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
@@ -149,11 +166,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 if (outputToSpark && sparkContextStorage.exists(outputLocation))
                     sparkContextStorage.rm(outputLocation);
             }
-            // wire up a spark context
-            final SparkConf sparkConfiguration = new SparkConf();
-            sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
+
+            // the Spark application name will always be set by SparkContextStorage, thus, INFO the name to make it easier to debug
+            logger.info(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
 
             // create the spark configuration from the graph computer configuration
+            final SparkConf sparkConfiguration = new SparkConf();
             hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
             // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
             try {
@@ -162,44 +180,42 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 Spark.create(sparkContext.sc()); // this is the context RDD holder that prevents GC
                 updateLocalConfiguration(sparkContext, sparkConfiguration);
                 // create a message-passing friendly rdd from the input rdd
-                JavaPairRDD<Object, VertexWritable> loadedGraphRDD;
                 JavaPairRDD<Object, VertexWritable> computedGraphRDD = null;
                 boolean partitioned = false;
-                try {
-                    loadedGraphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
-                            .newInstance()
-                            .readGraphRDD(apacheConfiguration, sparkContext);
-
+                JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(apacheConfiguration, sparkContext);
+                // if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting
+                if (filtered) {
+                    this.logger.info("Filtering the loaded graphRDD: " + this.vertexFilter + " and " + this.edgeFilter);
                     loadedGraphRDD = SparkExecutor.filterLoadedGraph(loadedGraphRDD, this.vertexFilter, this.edgeFilter);
-
-                    if (loadedGraphRDD.partitioner().isPresent())
-                        this.logger.info("Using the existing partitioner associated with the loaded graphRDD: " + loadedGraphRDD.partitioner().get());
-                    else {
-                        loadedGraphRDD = loadedGraphRDD.partitionBy(new HashPartitioner(this.workersSet ? this.workers : loadedGraphRDD.partitions().size()));
-                        partitioned = true;
-                    }
-                    assert loadedGraphRDD.partitioner().isPresent();
-                    // if the loaded graphRDD was already partitioned previous, then this coalesce/repartition will not take place
-                    if (this.workersSet) {
-                        if (loadedGraphRDD.partitions().size() > this.workers) // ensures that the loaded graphRDD does not have more partitions than workers
-                            loadedGraphRDD = loadedGraphRDD.coalesce(this.workers);
-                        else if (loadedGraphRDD.partitions().size() < this.workers) // ensures that the loaded graphRDD does not have less partitions than workers
-                            loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
-                    }
-                    // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
-                    if (!inputFromSpark || partitioned)
-                        loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
-                } catch (final InstantiationException | IllegalAccessException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
                 }
+                // if the loaded graph RDD is already partitioned use that partitioner, else partition it with HashPartitioner
+                if (loadedGraphRDD.partitioner().isPresent())
+                    this.logger.info("Using the existing partitioner associated with the loaded graphRDD: " + loadedGraphRDD.partitioner().get());
+                else {
+                    final Partitioner partitioner = new HashPartitioner(this.workersSet ? this.workers : loadedGraphRDD.partitions().size());
+                    this.logger.info("Partitioning the loaded graphRDD: " + partitioner);
+                    loadedGraphRDD = loadedGraphRDD.partitionBy(partitioner);
+                    partitioned = true;
+                }
+                assert loadedGraphRDD.partitioner().isPresent();
+                // if the loaded graphRDD was already partitioned previous, then this coalesce/repartition will not take place
+                if (this.workersSet) {
+                    if (loadedGraphRDD.partitions().size() > this.workers) // ensures that the loaded graphRDD does not have more partitions than workers
+                        loadedGraphRDD = loadedGraphRDD.coalesce(this.workers);
+                    else if (loadedGraphRDD.partitions().size() < this.workers) // ensures that the loaded graphRDD does not have less partitions than workers
+                        loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
+                }
+                // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
+                if (!inputFromSpark || partitioned || filtered)
+                    loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
 
-                JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
 
                 ////////////////////////////////
                 // process the vertex program //
                 ////////////////////////////////
                 if (null != this.vertexProgram) {
                     // set up the vertex program and wire up configurations
+                    JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
                     memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                     this.vertexProgram.setup(memory);
                     memory.broadcastMemory(sparkContext);
@@ -224,13 +240,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, elementComputeKeys);
                     if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
                             hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) && !this.persist.equals(Persist.NOTHING)) {
-                        try {
-                            hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
-                                    .newInstance()
-                                    .writeGraphRDD(apacheConfiguration, computedGraphRDD);
-                        } catch (final InstantiationException | IllegalAccessException e) {
-                            throw new IllegalStateException(e.getMessage(), e);
-                        }
+                        outputRDD.writeGraphRDD(apacheConfiguration, computedGraphRDD);
                     }
                 }
 
@@ -279,7 +289,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
 
                 // unpersist the loaded graph if it will not be used again (no PersistedInputRDD)
                 // if the graphRDD was loaded from Spark, but then partitioned, its a different RDD
-                if ((!inputFromSpark || partitioned) && computedGraphCreated)
+                if ((!inputFromSpark || partitioned || filtered) && computedGraphCreated)
                     loadedGraphRDD.unpersist();
                 // unpersist the computed graph if it will not be used again (no PersistedOutputRDD)
                 if (!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING))

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3e008c/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 07ad0c8..13f4fa3 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -25,6 +25,8 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
@@ -92,6 +94,16 @@ public final class TinkerGraphComputer implements GraphComputer {
     }
 
     @Override
+    public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public GraphComputer edges(final Traversal<Edge, Edge> edgeFilter) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public Future<ComputerResult> submit() {
         // a graph computer can only be executed once
         if (this.executed)