You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/06 20:54:59 UTC

[28/30] incubator-tinkerpop git commit: StarGraph is no longer special to GraphFilter. All the filtering methods for StarGraph that were weirdly in GraphFilter are now just in StarGraph -- i.e. StarGraph.applyGraphFilter(GraphFilter). We may want to crea

StarGraph is no longer special to GraphFilter. All the filtering methods for StarGraph that were weirdly in GraphFilter are now just in StarGraph -- i.e. StarGraph.applyGraphFilter(GraphFilter). We may want to create an interface moving forward like Filterable which requires an applyGraphFilter(GraphFilter) implementation. Added more test cases GraphComputerTest -- we now test graph filtering for the VERTEX_PROGRAM, VERTEX_PROGRAM+MAP_REDUCE, MAP_REDUCE. A slight optimization to MapReduceHelper (Giraph) where if there was a VERTEX_PROGRAM, no need to use a GraphFilter as its already been filtered by the VERTEX_PROGRAM InputFormat.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/482ccdad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/482ccdad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/482ccdad

Branch: refs/heads/master
Commit: 482ccdad79105871582e464217d363f735b87611
Parents: 4134254
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 5 09:50:26 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 5 09:50:26 2016 -0700

----------------------------------------------------------------------
 .../gremlin/process/computer/GraphFilter.java   |  97 ++++-------------
 .../gremlin/structure/util/star/StarGraph.java  | 109 +++++++++++++------
 .../util/star/StarGraphGryoSerializer.java      |   2 +-
 .../process/computer/GraphComputerTest.java     |  10 ++
 .../computer/GraphFilterRecordReader.java       |   2 +-
 .../process/computer/util/MapReduceHelper.java  |   4 +-
 .../structure/io/script/ScriptRecordReader.java |   2 +-
 .../spark/process/computer/SparkExecutor.java   |   2 +-
 8 files changed, 113 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
index d5d4186..0fb700a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
@@ -27,17 +27,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -45,7 +39,8 @@ import java.util.Set;
  * There are two types of filters: a {@link Vertex} filter and an {@link Edge} filter.
  * The vertex filter is a {@link Traversal} that can only check the id, label, and properties of the vertex.
  * The edge filter is a {@link Traversal} that starts at the vertex are emits all legal incident edges.
- * The use of GraphFilter can greatly reduce the amount of data processed by the {@link GraphComputer}.
+ * If no vertex filter is provided, then no vertices are filtered. If no edge filter is provided, then no edges are filtered.
+ * The use of a GraphFilter can greatly reduce the amount of data processed by the {@link GraphComputer}.
  * For instance, for {@code g.V().count()}, there is no reason to load edges, and thus, the edge filter can be {@code bothE().limit(0)}.
  *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -58,6 +53,10 @@ public final class GraphFilter implements Cloneable, Serializable {
         public boolean positive() {
             return this != NO;
         }
+
+        public boolean negative() {
+            return this == NO;
+        }
     }
 
     private Traversal.Admin<Vertex, Vertex> vertexFilter = null;
@@ -66,7 +65,7 @@ public final class GraphFilter implements Cloneable, Serializable {
     private boolean allowNoEdges = false;
     private Direction allowedEdgeDirection = Direction.BOTH;
     private Set<String> allowedEdgeLabels = new HashSet<>();
-    private boolean allowAllRemainingEdges = false;
+    //private boolean allowAllRemainingEdges = false;
 
     public void setVertexFilter(final Traversal<Vertex, Vertex> vertexFilter) {
         if (!TraversalHelper.isLocalVertex(vertexFilter.asAdmin()))
@@ -84,16 +83,16 @@ public final class GraphFilter implements Cloneable, Serializable {
             this.allowedEdgeLabels.clear();
             this.allowedEdgeLabels.addAll(Arrays.asList(((VertexStep) this.edgeFilter.getStartStep()).getEdgeLabels()));
             this.allowedEdgeDirection = ((VertexStep) this.edgeFilter.getStartStep()).getDirection();
-            this.allowAllRemainingEdges = 1 == this.edgeFilter.getSteps().size();
+            //this.allowAllRemainingEdges = 1 == this.edgeFilter.getSteps().size();
         }
     }
 
-    public void compileFilters() {
+    /*public void compileFilters() {
         if (null != this.vertexFilter && !this.vertexFilter.isLocked())
             this.vertexFilter.applyStrategies();
         if (null != this.edgeFilter && !this.edgeFilter.isLocked())
             this.edgeFilter.applyStrategies();
-    }
+    }*/
 
     public boolean legalVertex(final Vertex vertex) {
         return null == this.vertexFilter || TraversalUtil.test(vertex, this.vertexFilter);
@@ -138,9 +137,20 @@ public final class GraphFilter implements Cloneable, Serializable {
             return Legal.MAYBE;
     }
 
+    public Legal checkEdgeLegality(final Direction direction) {
+        if (null == this.edgeFilter)
+            return Legal.YES;
+        else if (this.allowNoEdges)
+            return Legal.NO;
+        else if (!this.allowedEdgeDirection.equals(Direction.BOTH) && !this.allowedEdgeDirection.equals(direction))
+            return Legal.NO;
+        else
+            return Legal.MAYBE;
+    }
+
     @Override
     public int hashCode() {
-        return (null == this.edgeFilter ? 124 : this.edgeFilter.hashCode()) ^ (null == this.vertexFilter ? 875 : this.vertexFilter.hashCode());
+        return (null == this.edgeFilter ? 111 : this.edgeFilter.hashCode()) ^ (null == this.vertexFilter ? 222 : this.vertexFilter.hashCode());
     }
 
     @Override
@@ -180,67 +190,4 @@ public final class GraphFilter implements Cloneable, Serializable {
         else
             return "graphfilter[" + this.edgeFilter + "]";
     }
-
-    //////////////////////////////////////
-    /////////////////////////////////////
-    ////////////////////////////////////
-
-    public Optional<StarGraph> applyGraphFilter(final StarGraph graph) {
-        final Optional<StarGraph.StarVertex> filtered = this.applyGraphFilter(graph.getStarVertex());
-        return filtered.isPresent() ? Optional.of((StarGraph) filtered.get().graph()) : Optional.empty();
-    }
-
-    public Optional<StarGraph.StarVertex> applyGraphFilter(final StarGraph.StarVertex vertex) {
-        if (!this.hasFilter())
-            return Optional.of(vertex);
-        else if (null == vertex)
-            return Optional.empty();
-        else if (this.legalVertex(vertex)) {
-            if (this.hasEdgeFilter()) {
-                if (this.allowNoEdges) {
-                    vertex.dropEdges(Direction.BOTH);
-                } else {
-                    if (!this.allowedEdgeDirection.equals(Direction.BOTH))
-                        vertex.dropEdges(this.allowedEdgeDirection.opposite());
-                    if (!this.allowedEdgeLabels.isEmpty())
-                        vertex.keepEdges(this.allowedEdgeDirection, this.allowedEdgeLabels);
-
-                    if (!this.allowAllRemainingEdges) {
-                        final Map<String, List<Edge>> outEdges = new HashMap<>();
-                        final Map<String, List<Edge>> inEdges = new HashMap<>();
-                        this.legalEdges(vertex).forEachRemaining(edge -> {
-                            if (edge instanceof StarGraph.StarOutEdge) {
-                                List<Edge> edges = outEdges.get(edge.label());
-                                if (null == edges) {
-                                    edges = new ArrayList<>();
-                                    outEdges.put(edge.label(), edges);
-                                }
-                                edges.add(edge);
-                            } else {
-                                List<Edge> edges = inEdges.get(edge.label());
-                                if (null == edges) {
-                                    edges = new ArrayList<>();
-                                    inEdges.put(edge.label(), edges);
-                                }
-                                edges.add(edge);
-                            }
-                        });
-
-                        if (outEdges.isEmpty())
-                            vertex.dropEdges(Direction.OUT);
-                        else
-                            vertex.setEdges(Direction.OUT, outEdges);
-
-                        if (inEdges.isEmpty())
-                            vertex.dropEdges(Direction.IN);
-                        else
-                            vertex.setEdges(Direction.IN, inEdges);
-                    }
-                }
-            }
-            return Optional.of(vertex);
-        } else {
-            return Optional.empty();
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
index 5973df6..56a96fa 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.structure.util.star;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
@@ -44,7 +45,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
+import java.util.Optional;
 import java.util.stream.Stream;
 
 /**
@@ -216,6 +217,13 @@ public final class StarGraph implements Graph, Serializable {
         return starGraph;
     }
 
+    public Optional<StarGraph> applyGraphFilter(final GraphFilter graphFilter) {
+        if (null == this.starVertex)
+            return Optional.empty();
+        final Optional<StarGraph.StarVertex> filtered = this.starVertex.applyGraphFilter(graphFilter);
+        return filtered.isPresent() ? Optional.of((StarGraph) filtered.get().graph()) : Optional.empty();
+    }
+
     ///////////////////////
     //// STAR ELEMENT ////
     //////////////////////
@@ -275,30 +283,6 @@ public final class StarGraph implements Graph, Serializable {
             super(id, label);
         }
 
-        public void setEdges(final Direction direction, final Map<String, List<Edge>> edges) {
-            if (direction.equals(Direction.OUT))
-                this.outEdges = edges;
-            else if (direction.equals(Direction.IN))
-                this.inEdges = edges;
-            else
-                throw new IllegalArgumentException("The following direction is not supported: " + direction);
-        }
-
-        public void keepEdges(final Direction direction, final Set<String> edgeLabels) {
-            final Set<String> dropLabels = new HashSet<>();
-            if ((direction.equals(Direction.OUT) || direction.equals(Direction.BOTH)) && null != this.outEdges)
-                dropLabels.addAll(this.outEdges.keySet());
-            if ((direction.equals(Direction.IN) || direction.equals(Direction.BOTH)) && null != this.inEdges)
-                dropLabels.addAll(this.inEdges.keySet());
-            //
-            for (final String label : edgeLabels) {
-                dropLabels.remove(label);
-            }
-            if (dropLabels.size() > 0) {
-                this.dropEdges(direction, dropLabels);
-            }
-        }
-
         public void dropEdges(final Direction direction) {
             if ((direction.equals(Direction.OUT) || direction.equals(Direction.BOTH)) && null != this.outEdges) {
                 this.outEdges.clear();
@@ -310,18 +294,15 @@ public final class StarGraph implements Graph, Serializable {
             }
         }
 
-        public void dropEdges(final Direction direction, final Set<String> edgeLabels) {
-            if ((direction.equals(Direction.OUT) || direction.equals(Direction.BOTH)) && null != this.outEdges) {
-                for (final String edgeLabel : edgeLabels) {
-                    this.outEdges.remove(edgeLabel);
-                }
+        public void dropEdges(final Direction direction, final String edgeLabel) {
+            if (null != this.outEdges && (direction.equals(Direction.OUT) || direction.equals(Direction.BOTH))) {
+                this.outEdges.remove(edgeLabel);
+
                 if (this.outEdges.isEmpty())
                     this.outEdges = null;
             }
-            if ((direction.equals(Direction.IN) || direction.equals(Direction.BOTH)) && null != this.inEdges) {
-                for (final String edgeLabel : edgeLabels) {
-                    this.inEdges.remove(edgeLabel);
-                }
+            if (null != this.inEdges && (direction.equals(Direction.IN) || direction.equals(Direction.BOTH))) {
+                this.inEdges.remove(edgeLabel);
 
                 if (this.inEdges.isEmpty())
                     this.inEdges = null;
@@ -451,6 +432,65 @@ public final class StarGraph implements Graph, Serializable {
                         .flatMap(entry -> entry.getValue().stream())
                         .iterator();
         }
+
+        ///////////////
+
+        public Optional<StarVertex> applyGraphFilter(final GraphFilter graphFilter) {
+            if (!graphFilter.hasFilter())
+                return Optional.of(this);
+            else if (graphFilter.legalVertex(this)) {
+                if (graphFilter.hasEdgeFilter()) {
+                    if (graphFilter.checkEdgeLegality(Direction.OUT).negative())
+                        this.dropEdges(Direction.OUT);
+                    if (graphFilter.checkEdgeLegality(Direction.IN).negative())
+                        this.dropEdges(Direction.IN);
+                    if (null != this.outEdges)
+                        for (final String key : new HashSet<>(this.outEdges.keySet())) {
+                            if (graphFilter.checkEdgeLegality(Direction.OUT, key).negative())
+                                this.dropEdges(Direction.OUT, key);
+                        }
+                    if (null != this.inEdges)
+                        for (final String key : new HashSet<>(this.inEdges.keySet())) {
+                            if (graphFilter.checkEdgeLegality(Direction.IN, key).negative())
+                                this.dropEdges(Direction.IN, key);
+                        }
+                    if (null != this.inEdges || null != this.outEdges) {
+                        final Map<String, List<Edge>> outEdges = new HashMap<>();
+                        final Map<String, List<Edge>> inEdges = new HashMap<>();
+                        graphFilter.legalEdges(this).forEachRemaining(edge -> {
+                            if (edge instanceof StarGraph.StarOutEdge) {
+                                List<Edge> edges = outEdges.get(edge.label());
+                                if (null == edges) {
+                                    edges = new ArrayList<>();
+                                    outEdges.put(edge.label(), edges);
+                                }
+                                edges.add(edge);
+                            } else {
+                                List<Edge> edges = inEdges.get(edge.label());
+                                if (null == edges) {
+                                    edges = new ArrayList<>();
+                                    inEdges.put(edge.label(), edges);
+                                }
+                                edges.add(edge);
+                            }
+                        });
+
+                        if (outEdges.isEmpty())
+                            this.dropEdges(Direction.OUT);
+                        else
+                            this.outEdges = outEdges;
+
+                        if (inEdges.isEmpty())
+                            this.dropEdges(Direction.IN);
+                        else
+                            this.inEdges = inEdges;
+                    }
+                }
+                return Optional.of(this);
+            } else {
+                return Optional.empty();
+            }
+        }
     }
 
     ///////////////////////////////
@@ -885,5 +925,4 @@ public final class StarGraph implements Graph, Serializable {
             return true;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
index 6316572..beafb33 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
@@ -123,7 +123,7 @@ public final class StarGraphGryoSerializer extends Serializer<StarGraph> {
                 }
             }
         }
-        return this.graphFilter.hasFilter() ? this.graphFilter.applyGraphFilter(starGraph).orElse(null) : starGraph;
+        return this.graphFilter.hasFilter() ? starGraph.applyGraphFilter(this.graphFilter).orElse(null) : starGraph;
     }
 
     private void writeEdges(final Kryo kryo, final Output output, final StarGraph starGraph, final Direction direction) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 55d9dab..2ef531e 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -1488,6 +1488,16 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
     @Test
     @LoadGraphWith(MODERN)
     public void shouldSupportGraphFilter() throws Exception {
+        /// VERTEX PROGRAM
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
+
         /// VERTEX PROGRAM + MAP REDUCE
         graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
         graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
index e2114ef..954dd44 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
@@ -66,7 +66,7 @@ public final class GraphFilterRecordReader extends RecordReader<NullWritable, Ve
             while (true) {
                 if (this.recordReader.nextKeyValue()) {
                     final VertexWritable vertexWritable = this.recordReader.getCurrentValue();
-                    final Optional<StarGraph.StarVertex> vertex = this.graphFilter.applyGraphFilter(vertexWritable.get());
+                    final Optional<StarGraph.StarVertex> vertex = vertexWritable.get().applyGraphFilter(this.graphFilter);
                     if (vertex.isPresent()) {
                         vertexWritable.set(vertex.get());
                         return true;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
index 5cf5d1e..e871269 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
@@ -72,8 +72,10 @@ public final class MapReduceHelper {
             final Optional<Comparator<?>> reduceSort = mapReduce.getReduceKeySort();
 
             newConfiguration.setClass(Constants.GREMLIN_HADOOP_MAP_REDUCE_CLASS, mapReduce.getClass(), MapReduce.class);
-            if (vertexProgramExists)
+            if (vertexProgramExists) {
                 newConfiguration.set(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat((Class) newConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class)).getCanonicalName());
+                newConfiguration.unset(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+            }
             final Job job = Job.getInstance(newConfiguration, mapReduce.toString());
             HadoopGraph.LOGGER.info(Constants.GREMLIN_HADOOP_JOB_PREFIX + mapReduce.toString());
             job.setJarByClass(HadoopGraph.class);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
index 69cc567..80a99fd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
@@ -92,7 +92,7 @@ public final class ScriptRecordReader extends RecordReader<NullWritable, VertexW
                 final Bindings bindings = this.engine.createBindings();
                 bindings.put(LINE, this.lineRecordReader.getCurrentValue().toString());
                 bindings.put(FACTORY, new ScriptElementFactory());
-                final Optional<StarGraph.StarVertex> vertex = this.graphFilter.applyGraphFilter((StarGraph.StarVertex) engine.eval(READ_CALL, bindings));
+                final Optional<StarGraph.StarVertex> vertex = ((StarGraph.StarVertex) engine.eval(READ_CALL, bindings)).applyGraphFilter(this.graphFilter);
                 if (vertex.isPresent()) {
                     this.vertexWritable.set(vertex.get());
                     return true;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 65a69a2..9b62eb6 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -62,7 +62,7 @@ public final class SparkExecutor {
     public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
         return graphRDD.mapPartitionsToPair(partitionIterator -> {
             final GraphFilter gFilter = graphFilter.clone();
-            return () -> IteratorUtils.filter(partitionIterator, tuple -> gFilter.applyGraphFilter(tuple._2().get()).isPresent());
+            return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
         }, true);
     }