You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/06 20:54:36 UTC

[05/30] incubator-tinkerpop git commit: GiraphGraphComputer now has support for vertexFilters and edgeFilters. Consolidated a bunch of code to make it easy for future InputFormats to be GraphFilterAware. Will most likely make a filterMap so variables are

GiraphGraphComputer now has support for vertexFilters and edgeFilters. Consolidated a bunch of code to make it easy for future InputFormats to be GraphFilterAware. Will most likely make a filterMap so variables are bundled nicely.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/64c68406
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/64c68406
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/64c68406

Branch: refs/heads/master
Commit: 64c684065143b75697ccac755b9dfbf943c8c54c
Parents: 77732dd
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Feb 1 16:46:07 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Feb 1 16:46:07 2016 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   |  5 +++
 .../structure/io/GiraphVertexInputFormat.java   | 20 ++++++++++-
 .../giraph/structure/io/GiraphVertexReader.java | 29 ++++++++++++++--
 .../tinkerpop/gremlin/hadoop/Constants.java     |  4 +--
 .../structure/io/CommonFileInputFormat.java     | 21 ------------
 .../hadoop/structure/io/GraphFilterAware.java   | 35 ++++++++++++++++++++
 .../structure/io/gryo/GryoRecordReader.java     |  4 +--
 .../structure/io/script/ScriptRecordReader.java |  4 +--
 .../process/computer/SparkGraphComputer.java    |  2 +-
 .../spark/structure/io/InputFormatRDD.java      | 15 ---------
 10 files changed, 93 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/64c68406/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 18bb25e..2503838 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -42,6 +42,7 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmiss
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
@@ -143,6 +144,10 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
         storage.rm(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
         this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
         try {
+            // store vertex and edge filters (will propagate down to native InputFormat or else GiraphVertexInputFormat will process)
+            final Configuration apacheConfiguration = new BaseConfiguration();
+            GraphFilterAware.storeVertexAndEdgeFilters(apacheConfiguration, this.giraphConfiguration, this.vertexFilter, this.edgeFilter);
+
             // it is possible to run graph computer without a vertex program (and thus, only map reduce jobs if they exist)
             if (null != this.vertexProgram) {
                 // a way to verify in Giraph whether the traversal will go over the wire or not

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/64c68406/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
index 6b6638a..cb802fb 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
@@ -28,7 +28,13 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 
 import java.io.IOException;
 import java.util.List;
@@ -39,6 +45,10 @@ import java.util.List;
 public final class GiraphVertexInputFormat extends VertexInputFormat {
 
     private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
+    protected Traversal.Admin<Vertex, Vertex> vertexFilter = null;
+    protected Traversal.Admin<Edge, Edge> edgeFilter = null;
+    private boolean filtersLoader = false;
+    private boolean graphFilterAware;
 
     @Override
     public void checkInputSpecs(final Configuration configuration) {
@@ -55,7 +65,7 @@ public final class GiraphVertexInputFormat extends VertexInputFormat {
     public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
         this.constructor(context.getConfiguration());
         try {
-            return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context));
+            return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context), this.graphFilterAware, this.vertexFilter, this.edgeFilter);
         } catch (InterruptedException e) {
             throw new IOException(e);
         }
@@ -64,6 +74,14 @@ public final class GiraphVertexInputFormat extends VertexInputFormat {
     private final void constructor(final Configuration configuration) {
         if (null == this.hadoopGraphInputFormat) {
             this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
+            this.graphFilterAware = this.hadoopGraphInputFormat instanceof GraphFilterAware;
+            if (!this.filtersLoader) {
+                if (configuration.get(Constants.GREMLIN_HADOOP_VERTEX_FILTER, null) != null)
+                    this.vertexFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_VERTEX_FILTER);
+                if (configuration.get(Constants.GREMLIN_HADOOP_EDGE_FILTER, null) != null)
+                    this.edgeFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_EDGE_FILTER);
+                this.filtersLoader = true;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/64c68406/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
index 2a6ade6..b4a81fb 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.structure.Edge;
 
 import java.io.IOException;
 
@@ -35,9 +39,15 @@ import java.io.IOException;
 public final class GiraphVertexReader extends VertexReader {
 
     private RecordReader<NullWritable, VertexWritable> recordReader;
+    private final Traversal.Admin<org.apache.tinkerpop.gremlin.structure.Vertex, org.apache.tinkerpop.gremlin.structure.Vertex> vertexFilter;
+    private final Traversal.Admin<Edge, Edge> edgeFilter;
+    private final boolean graphFilterAware;
 
-    public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
+    public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader, final boolean graphFilterAware, final Traversal.Admin<org.apache.tinkerpop.gremlin.structure.Vertex, org.apache.tinkerpop.gremlin.structure.Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
         this.recordReader = recordReader;
+        this.graphFilterAware = graphFilterAware;
+        this.vertexFilter = null == vertexFilter ? null : vertexFilter.clone();
+        this.edgeFilter = null == edgeFilter ? null : edgeFilter.clone();
     }
 
     @Override
@@ -47,7 +57,22 @@ public final class GiraphVertexReader extends VertexReader {
 
     @Override
     public boolean nextVertex() throws IOException, InterruptedException {
-        return this.recordReader.nextKeyValue();
+        if (this.graphFilterAware) {
+            return this.recordReader.nextKeyValue();
+        } else {
+            while (true) {
+                if (this.recordReader.nextKeyValue()) {
+                    final VertexWritable vertexWritable = this.recordReader.getCurrentValue();
+                    final org.apache.tinkerpop.gremlin.structure.Vertex vertex = GraphFilterAware.applyVertexAndEdgeFilters(vertexWritable.get(), this.vertexFilter, this.edgeFilter);
+                    if (null != vertex) {
+                        vertexWritable.set(vertex);
+                        return true;
+                    }
+                } else {
+                    return false;
+                }
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/64c68406/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index e404570..baca3d7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -35,8 +35,8 @@ public final class Constants {
     public static final String GREMLIN_HADOOP_OUTPUT_LOCATION = "gremlin.hadoop.outputLocation";
     public static final String GREMLIN_HADOOP_GRAPH_INPUT_FORMAT = "gremlin.hadoop.graphInputFormat";
     public static final String GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT = "gremlin.hadoop.graphOutputFormat";
-    public static final String GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES = "gremlin.hadoop.graphOutputFormat.hasEdges";
-    public static final String GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES = "gremlin.hadoop.graphInputFormat.hasEdges";
+    public static final String GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES = "gremlin.hadoop.graphInputFormat.hasEdges";
+    public static final String GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES = "gremlin.hadoop.graphOutputFormat.hasEdges";
 
     public static final String GREMLIN_HADOOP_VERTEX_FILTER = "gremlin.hadoop.vertexFilter";
     public static final String GREMLIN_HADOOP_EDGE_FILTER = "gremlin.hadoop.edgeFilter";

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/64c68406/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
index 679fd68..ddb10c3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
@@ -29,13 +29,9 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
-import java.util.Iterator;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -69,21 +65,4 @@ public abstract class CommonFileInputFormat extends FileInputFormat<NullWritable
     public void setEdgeFilter(final Traversal<Edge, Edge> edgeFilter) {
         // do nothing. loaded through configuration.
     }
-
-    public static final Vertex applyVertexAndEdgeFilters(final Vertex vertex, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
-        if (null == vertex)
-            return null;
-        else if (vertexFilter == null || TraversalUtil.test(vertex, vertexFilter)) {
-            if (edgeFilter != null) {
-                final Iterator<Edge> edgeIterator = vertex.edges(Direction.BOTH);
-                while (edgeIterator.hasNext()) {
-                    if (!TraversalUtil.test(edgeIterator.next(), edgeFilter))
-                        edgeIterator.remove();
-                }
-            }
-            return vertex;
-        } else {
-            return null;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/64c68406/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
index a840523..5bb9538 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
@@ -19,10 +19,17 @@
 
 package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
+import java.util.Iterator;
+
 /**
  * An input graph class is {@code GraphFilterAware} if it can filter out vertices and edges as its loading the graph from the
  * source data. Any input class that is {@code GraphFilterAware} must be able to fully handle both vertex and edge filtering.
@@ -36,4 +43,32 @@ public interface GraphFilterAware {
     public void setVertexFilter(final Traversal<Vertex, Vertex> vertexFilter);
 
     public void setEdgeFilter(final Traversal<Edge, Edge> edgeFilter);
+
+    public static Vertex applyVertexAndEdgeFilters(final Vertex vertex, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
+        if (null == vertex)
+            return null;
+        else if (vertexFilter == null || TraversalUtil.test(vertex, vertexFilter)) {
+            if (edgeFilter != null) {
+                final Iterator<Edge> edgeIterator = vertex.edges(Direction.BOTH);
+                while (edgeIterator.hasNext()) {
+                    if (!TraversalUtil.test(edgeIterator.next(), edgeFilter))
+                        edgeIterator.remove();
+                }
+            }
+            return vertex;
+        } else {
+            return null;
+        }
+    }
+
+    public static void storeVertexAndEdgeFilters(final Configuration apacheConfiguration, final org.apache.hadoop.conf.Configuration hadoopConfiguration, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
+        if (null != vertexFilter) {
+            VertexProgramHelper.serialize(vertexFilter, apacheConfiguration, Constants.GREMLIN_HADOOP_VERTEX_FILTER);
+            hadoopConfiguration.set(Constants.GREMLIN_HADOOP_VERTEX_FILTER, apacheConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_FILTER));
+        }
+        if (null != edgeFilter) {
+            VertexProgramHelper.serialize(edgeFilter, apacheConfiguration, Constants.GREMLIN_HADOOP_EDGE_FILTER);
+            hadoopConfiguration.set(Constants.GREMLIN_HADOOP_EDGE_FILTER, apacheConfiguration.getString(Constants.GREMLIN_HADOOP_EDGE_FILTER));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/64c68406/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
index ad4f936..82e85f8 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -131,7 +131,7 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
             terminatorLocation = ((byte) currentByte) == TERMINATOR[terminatorLocation] ? terminatorLocation + 1 : 0;
             if (terminatorLocation >= TERMINATOR.length) {
                 try (InputStream in = new ByteArrayInputStream(output.toByteArray())) {
-                    final Vertex vertex = CommonFileInputFormat.applyVertexAndEdgeFilters(this.gryoReader.readVertex(in, Attachable::get), this.vertexFilter, this.edgeFilter); // I know how GryoReader works, so I'm cheating here
+                    final Vertex vertex = GraphFilterAware.applyVertexAndEdgeFilters(this.gryoReader.readVertex(in, Attachable::get), this.vertexFilter, this.edgeFilter); // I know how GryoReader works, so I'm cheating here
                     if (null != vertex) {
                         this.vertexWritable.set(vertex);
                         return true;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/64c68406/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
index 44d60b4..34aa138 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
 import org.apache.tinkerpop.gremlin.groovy.CompilerCustomizerProvider;
 import org.apache.tinkerpop.gremlin.groovy.DefaultImportCustomizerProvider;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -90,7 +90,7 @@ public final class ScriptRecordReader extends RecordReader<NullWritable, VertexW
                 final Bindings bindings = this.engine.createBindings();
                 bindings.put(LINE, this.lineRecordReader.getCurrentValue().toString());
                 bindings.put(FACTORY, new ScriptElementFactory());
-                final Vertex vertex = CommonFileInputFormat.applyVertexAndEdgeFilters((Vertex) engine.eval(READ_CALL, bindings), this.vertexFilter, this.edgeFilter);
+                final Vertex vertex = GraphFilterAware.applyVertexAndEdgeFilters((Vertex) engine.eval(READ_CALL, bindings), this.vertexFilter, this.edgeFilter);
                 if (vertex != null) {
                     this.vertexWritable.set(vertex);
                     return true;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/64c68406/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 6606dae..a93bf06 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -150,7 +150,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 outputRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class).newInstance();
                 // if the input class can filter on load, then set the filters
                 if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class))) {
-                    InputFormatRDD.storeVertexAndEdgeFilters(apacheConfiguration, hadoopConfiguration, this.vertexFilter, this.edgeFilter);
+                    GraphFilterAware.storeVertexAndEdgeFilters(apacheConfiguration, hadoopConfiguration, this.vertexFilter, this.edgeFilter);
                     filtered = false;
                 } else if (inputRDD instanceof GraphFilterAware) {
                     if (null != this.vertexFilter)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/64c68406/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
index 7e7d1f7..57d7080 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
@@ -30,10 +30,6 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import scala.Tuple2;
 
 /**
@@ -62,15 +58,4 @@ public final class InputFormatRDD implements InputRDD {
                 ObjectWritable.class)
                 .mapToPair(tuple -> new Tuple2<>((K) ((Tuple2<ObjectWritable, ObjectWritable>) tuple)._1().get(), (V) ((Tuple2<ObjectWritable, ObjectWritable>) tuple)._2().get()));
     }
-
-    public static void storeVertexAndEdgeFilters(final Configuration apacheConfiguration, final org.apache.hadoop.conf.Configuration hadoopConfiguration, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
-        if (null != vertexFilter) {
-            VertexProgramHelper.serialize(vertexFilter, apacheConfiguration, Constants.GREMLIN_HADOOP_VERTEX_FILTER);
-            hadoopConfiguration.set(Constants.GREMLIN_HADOOP_VERTEX_FILTER, apacheConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_FILTER));
-        }
-        if (null != edgeFilter) {
-            VertexProgramHelper.serialize(edgeFilter, apacheConfiguration, Constants.GREMLIN_HADOOP_EDGE_FILTER);
-            hadoopConfiguration.set(Constants.GREMLIN_HADOOP_VERTEX_FILTER, apacheConfiguration.getString(Constants.GREMLIN_HADOOP_EDGE_FILTER));
-        }
-    }
 }