You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/06 20:54:59 UTC
[28/30] incubator-tinkerpop git commit: StarGraph is no longer
special to GraphFilter. All the filtering methods for StarGraph that were
weirdly in GraphFilter are now just in StarGraph -- i.e.
StarGraph.applyGraphFilter(GraphFilter). We may want to crea
StarGraph is no longer special to GraphFilter. All the filtering methods for StarGraph that were weirdly in GraphFilter are now just in StarGraph -- i.e. StarGraph.applyGraphFilter(GraphFilter). We may want to create an interface moving forward like Filterable which requires an applyGraphFilter(GraphFilter) implementation. Added more test cases GraphComputerTest -- we now test graph filtering for the VERTEX_PROGRAM, VERTEX_PROGRAM+MAP_REDUCE, MAP_REDUCE. A slight optimization to MapReduceHelper (Giraph) where if there was a VERTEX_PROGRAM, no need to use a GraphFilter as its already been filtered by the VERTEX_PROGRAM InputFormat.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/482ccdad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/482ccdad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/482ccdad
Branch: refs/heads/master
Commit: 482ccdad79105871582e464217d363f735b87611
Parents: 4134254
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 5 09:50:26 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 5 09:50:26 2016 -0700
----------------------------------------------------------------------
.../gremlin/process/computer/GraphFilter.java | 97 ++++-------------
.../gremlin/structure/util/star/StarGraph.java | 109 +++++++++++++------
.../util/star/StarGraphGryoSerializer.java | 2 +-
.../process/computer/GraphComputerTest.java | 10 ++
.../computer/GraphFilterRecordReader.java | 2 +-
.../process/computer/util/MapReduceHelper.java | 4 +-
.../structure/io/script/ScriptRecordReader.java | 2 +-
.../spark/process/computer/SparkExecutor.java | 2 +-
8 files changed, 113 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
index d5d4186..0fb700a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
@@ -27,17 +27,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import java.util.Set;
/**
@@ -45,7 +39,8 @@ import java.util.Set;
* There are two types of filters: a {@link Vertex} filter and an {@link Edge} filter.
* The vertex filter is a {@link Traversal} that can only check the id, label, and properties of the vertex.
* The edge filter is a {@link Traversal} that starts at the vertex are emits all legal incident edges.
- * The use of GraphFilter can greatly reduce the amount of data processed by the {@link GraphComputer}.
+ * If no vertex filter is provided, then no vertices are filtered. If no edge filter is provided, then no edges are filtered.
+ * The use of a GraphFilter can greatly reduce the amount of data processed by the {@link GraphComputer}.
* For instance, for {@code g.V().count()}, there is no reason to load edges, and thus, the edge filter can be {@code bothE().limit(0)}.
*
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -58,6 +53,10 @@ public final class GraphFilter implements Cloneable, Serializable {
public boolean positive() {
return this != NO;
}
+
+ public boolean negative() {
+ return this == NO;
+ }
}
private Traversal.Admin<Vertex, Vertex> vertexFilter = null;
@@ -66,7 +65,7 @@ public final class GraphFilter implements Cloneable, Serializable {
private boolean allowNoEdges = false;
private Direction allowedEdgeDirection = Direction.BOTH;
private Set<String> allowedEdgeLabels = new HashSet<>();
- private boolean allowAllRemainingEdges = false;
+ //private boolean allowAllRemainingEdges = false;
public void setVertexFilter(final Traversal<Vertex, Vertex> vertexFilter) {
if (!TraversalHelper.isLocalVertex(vertexFilter.asAdmin()))
@@ -84,16 +83,16 @@ public final class GraphFilter implements Cloneable, Serializable {
this.allowedEdgeLabels.clear();
this.allowedEdgeLabels.addAll(Arrays.asList(((VertexStep) this.edgeFilter.getStartStep()).getEdgeLabels()));
this.allowedEdgeDirection = ((VertexStep) this.edgeFilter.getStartStep()).getDirection();
- this.allowAllRemainingEdges = 1 == this.edgeFilter.getSteps().size();
+ //this.allowAllRemainingEdges = 1 == this.edgeFilter.getSteps().size();
}
}
- public void compileFilters() {
+ /*public void compileFilters() {
if (null != this.vertexFilter && !this.vertexFilter.isLocked())
this.vertexFilter.applyStrategies();
if (null != this.edgeFilter && !this.edgeFilter.isLocked())
this.edgeFilter.applyStrategies();
- }
+ }*/
public boolean legalVertex(final Vertex vertex) {
return null == this.vertexFilter || TraversalUtil.test(vertex, this.vertexFilter);
@@ -138,9 +137,20 @@ public final class GraphFilter implements Cloneable, Serializable {
return Legal.MAYBE;
}
+ public Legal checkEdgeLegality(final Direction direction) {
+ if (null == this.edgeFilter)
+ return Legal.YES;
+ else if (this.allowNoEdges)
+ return Legal.NO;
+ else if (!this.allowedEdgeDirection.equals(Direction.BOTH) && !this.allowedEdgeDirection.equals(direction))
+ return Legal.NO;
+ else
+ return Legal.MAYBE;
+ }
+
@Override
public int hashCode() {
- return (null == this.edgeFilter ? 124 : this.edgeFilter.hashCode()) ^ (null == this.vertexFilter ? 875 : this.vertexFilter.hashCode());
+ return (null == this.edgeFilter ? 111 : this.edgeFilter.hashCode()) ^ (null == this.vertexFilter ? 222 : this.vertexFilter.hashCode());
}
@Override
@@ -180,67 +190,4 @@ public final class GraphFilter implements Cloneable, Serializable {
else
return "graphfilter[" + this.edgeFilter + "]";
}
-
- //////////////////////////////////////
- /////////////////////////////////////
- ////////////////////////////////////
-
- public Optional<StarGraph> applyGraphFilter(final StarGraph graph) {
- final Optional<StarGraph.StarVertex> filtered = this.applyGraphFilter(graph.getStarVertex());
- return filtered.isPresent() ? Optional.of((StarGraph) filtered.get().graph()) : Optional.empty();
- }
-
- public Optional<StarGraph.StarVertex> applyGraphFilter(final StarGraph.StarVertex vertex) {
- if (!this.hasFilter())
- return Optional.of(vertex);
- else if (null == vertex)
- return Optional.empty();
- else if (this.legalVertex(vertex)) {
- if (this.hasEdgeFilter()) {
- if (this.allowNoEdges) {
- vertex.dropEdges(Direction.BOTH);
- } else {
- if (!this.allowedEdgeDirection.equals(Direction.BOTH))
- vertex.dropEdges(this.allowedEdgeDirection.opposite());
- if (!this.allowedEdgeLabels.isEmpty())
- vertex.keepEdges(this.allowedEdgeDirection, this.allowedEdgeLabels);
-
- if (!this.allowAllRemainingEdges) {
- final Map<String, List<Edge>> outEdges = new HashMap<>();
- final Map<String, List<Edge>> inEdges = new HashMap<>();
- this.legalEdges(vertex).forEachRemaining(edge -> {
- if (edge instanceof StarGraph.StarOutEdge) {
- List<Edge> edges = outEdges.get(edge.label());
- if (null == edges) {
- edges = new ArrayList<>();
- outEdges.put(edge.label(), edges);
- }
- edges.add(edge);
- } else {
- List<Edge> edges = inEdges.get(edge.label());
- if (null == edges) {
- edges = new ArrayList<>();
- inEdges.put(edge.label(), edges);
- }
- edges.add(edge);
- }
- });
-
- if (outEdges.isEmpty())
- vertex.dropEdges(Direction.OUT);
- else
- vertex.setEdges(Direction.OUT, outEdges);
-
- if (inEdges.isEmpty())
- vertex.dropEdges(Direction.IN);
- else
- vertex.setEdges(Direction.IN, inEdges);
- }
- }
- }
- return Optional.of(vertex);
- } else {
- return Optional.empty();
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
index 5973df6..56a96fa 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.structure.util.star;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
@@ -44,7 +45,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Set;
+import java.util.Optional;
import java.util.stream.Stream;
/**
@@ -216,6 +217,13 @@ public final class StarGraph implements Graph, Serializable {
return starGraph;
}
+ public Optional<StarGraph> applyGraphFilter(final GraphFilter graphFilter) {
+ if (null == this.starVertex)
+ return Optional.empty();
+ final Optional<StarGraph.StarVertex> filtered = this.starVertex.applyGraphFilter(graphFilter);
+ return filtered.isPresent() ? Optional.of((StarGraph) filtered.get().graph()) : Optional.empty();
+ }
+
///////////////////////
//// STAR ELEMENT ////
//////////////////////
@@ -275,30 +283,6 @@ public final class StarGraph implements Graph, Serializable {
super(id, label);
}
- public void setEdges(final Direction direction, final Map<String, List<Edge>> edges) {
- if (direction.equals(Direction.OUT))
- this.outEdges = edges;
- else if (direction.equals(Direction.IN))
- this.inEdges = edges;
- else
- throw new IllegalArgumentException("The following direction is not supported: " + direction);
- }
-
- public void keepEdges(final Direction direction, final Set<String> edgeLabels) {
- final Set<String> dropLabels = new HashSet<>();
- if ((direction.equals(Direction.OUT) || direction.equals(Direction.BOTH)) && null != this.outEdges)
- dropLabels.addAll(this.outEdges.keySet());
- if ((direction.equals(Direction.IN) || direction.equals(Direction.BOTH)) && null != this.inEdges)
- dropLabels.addAll(this.inEdges.keySet());
- //
- for (final String label : edgeLabels) {
- dropLabels.remove(label);
- }
- if (dropLabels.size() > 0) {
- this.dropEdges(direction, dropLabels);
- }
- }
-
public void dropEdges(final Direction direction) {
if ((direction.equals(Direction.OUT) || direction.equals(Direction.BOTH)) && null != this.outEdges) {
this.outEdges.clear();
@@ -310,18 +294,15 @@ public final class StarGraph implements Graph, Serializable {
}
}
- public void dropEdges(final Direction direction, final Set<String> edgeLabels) {
- if ((direction.equals(Direction.OUT) || direction.equals(Direction.BOTH)) && null != this.outEdges) {
- for (final String edgeLabel : edgeLabels) {
- this.outEdges.remove(edgeLabel);
- }
+ public void dropEdges(final Direction direction, final String edgeLabel) {
+ if (null != this.outEdges && (direction.equals(Direction.OUT) || direction.equals(Direction.BOTH))) {
+ this.outEdges.remove(edgeLabel);
+
if (this.outEdges.isEmpty())
this.outEdges = null;
}
- if ((direction.equals(Direction.IN) || direction.equals(Direction.BOTH)) && null != this.inEdges) {
- for (final String edgeLabel : edgeLabels) {
- this.inEdges.remove(edgeLabel);
- }
+ if (null != this.inEdges && (direction.equals(Direction.IN) || direction.equals(Direction.BOTH))) {
+ this.inEdges.remove(edgeLabel);
if (this.inEdges.isEmpty())
this.inEdges = null;
@@ -451,6 +432,65 @@ public final class StarGraph implements Graph, Serializable {
.flatMap(entry -> entry.getValue().stream())
.iterator();
}
+
+ ///////////////
+
+ public Optional<StarVertex> applyGraphFilter(final GraphFilter graphFilter) {
+ if (!graphFilter.hasFilter())
+ return Optional.of(this);
+ else if (graphFilter.legalVertex(this)) {
+ if (graphFilter.hasEdgeFilter()) {
+ if (graphFilter.checkEdgeLegality(Direction.OUT).negative())
+ this.dropEdges(Direction.OUT);
+ if (graphFilter.checkEdgeLegality(Direction.IN).negative())
+ this.dropEdges(Direction.IN);
+ if (null != this.outEdges)
+ for (final String key : new HashSet<>(this.outEdges.keySet())) {
+ if (graphFilter.checkEdgeLegality(Direction.OUT, key).negative())
+ this.dropEdges(Direction.OUT, key);
+ }
+ if (null != this.inEdges)
+ for (final String key : new HashSet<>(this.inEdges.keySet())) {
+ if (graphFilter.checkEdgeLegality(Direction.IN, key).negative())
+ this.dropEdges(Direction.IN, key);
+ }
+ if (null != this.inEdges || null != this.outEdges) {
+ final Map<String, List<Edge>> outEdges = new HashMap<>();
+ final Map<String, List<Edge>> inEdges = new HashMap<>();
+ graphFilter.legalEdges(this).forEachRemaining(edge -> {
+ if (edge instanceof StarGraph.StarOutEdge) {
+ List<Edge> edges = outEdges.get(edge.label());
+ if (null == edges) {
+ edges = new ArrayList<>();
+ outEdges.put(edge.label(), edges);
+ }
+ edges.add(edge);
+ } else {
+ List<Edge> edges = inEdges.get(edge.label());
+ if (null == edges) {
+ edges = new ArrayList<>();
+ inEdges.put(edge.label(), edges);
+ }
+ edges.add(edge);
+ }
+ });
+
+ if (outEdges.isEmpty())
+ this.dropEdges(Direction.OUT);
+ else
+ this.outEdges = outEdges;
+
+ if (inEdges.isEmpty())
+ this.dropEdges(Direction.IN);
+ else
+ this.inEdges = inEdges;
+ }
+ }
+ return Optional.of(this);
+ } else {
+ return Optional.empty();
+ }
+ }
}
///////////////////////////////
@@ -885,5 +925,4 @@ public final class StarGraph implements Graph, Serializable {
return true;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
index 6316572..beafb33 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
@@ -123,7 +123,7 @@ public final class StarGraphGryoSerializer extends Serializer<StarGraph> {
}
}
}
- return this.graphFilter.hasFilter() ? this.graphFilter.applyGraphFilter(starGraph).orElse(null) : starGraph;
+ return this.graphFilter.hasFilter() ? starGraph.applyGraphFilter(this.graphFilter).orElse(null) : starGraph;
}
private void writeEdges(final Kryo kryo, final Output output, final StarGraph starGraph, final Direction direction) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 55d9dab..2ef531e 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -1488,6 +1488,16 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Test
@LoadGraphWith(MODERN)
public void shouldSupportGraphFilter() throws Exception {
+ /// VERTEX PROGRAM
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
+
/// VERTEX PROGRAM + MAP REDUCE
graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
index e2114ef..954dd44 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
@@ -66,7 +66,7 @@ public final class GraphFilterRecordReader extends RecordReader<NullWritable, Ve
while (true) {
if (this.recordReader.nextKeyValue()) {
final VertexWritable vertexWritable = this.recordReader.getCurrentValue();
- final Optional<StarGraph.StarVertex> vertex = this.graphFilter.applyGraphFilter(vertexWritable.get());
+ final Optional<StarGraph.StarVertex> vertex = vertexWritable.get().applyGraphFilter(this.graphFilter);
if (vertex.isPresent()) {
vertexWritable.set(vertex.get());
return true;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
index 5cf5d1e..e871269 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
@@ -72,8 +72,10 @@ public final class MapReduceHelper {
final Optional<Comparator<?>> reduceSort = mapReduce.getReduceKeySort();
newConfiguration.setClass(Constants.GREMLIN_HADOOP_MAP_REDUCE_CLASS, mapReduce.getClass(), MapReduce.class);
- if (vertexProgramExists)
+ if (vertexProgramExists) {
newConfiguration.set(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat((Class) newConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class)).getCanonicalName());
+ newConfiguration.unset(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+ }
final Job job = Job.getInstance(newConfiguration, mapReduce.toString());
HadoopGraph.LOGGER.info(Constants.GREMLIN_HADOOP_JOB_PREFIX + mapReduce.toString());
job.setJarByClass(HadoopGraph.class);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
index 69cc567..80a99fd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
@@ -92,7 +92,7 @@ public final class ScriptRecordReader extends RecordReader<NullWritable, VertexW
final Bindings bindings = this.engine.createBindings();
bindings.put(LINE, this.lineRecordReader.getCurrentValue().toString());
bindings.put(FACTORY, new ScriptElementFactory());
- final Optional<StarGraph.StarVertex> vertex = this.graphFilter.applyGraphFilter((StarGraph.StarVertex) engine.eval(READ_CALL, bindings));
+ final Optional<StarGraph.StarVertex> vertex = ((StarGraph.StarVertex) engine.eval(READ_CALL, bindings)).applyGraphFilter(this.graphFilter);
if (vertex.isPresent()) {
this.vertexWritable.set(vertex.get());
return true;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/482ccdad/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 65a69a2..9b62eb6 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -62,7 +62,7 @@ public final class SparkExecutor {
public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
return graphRDD.mapPartitionsToPair(partitionIterator -> {
final GraphFilter gFilter = graphFilter.clone();
- return () -> IteratorUtils.filter(partitionIterator, tuple -> gFilter.applyGraphFilter(tuple._2().get()).isPresent());
+ return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
}, true);
}