You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/01 22:45:54 UTC

incubator-tinkerpop git commit: Created a CommonFileInputFormat abstract class that both GryoInputFormat and ScriptInputFormat now extend. It handles all vertex/edge filter construction and has helper methods for filtering the StarVertex prior to being f

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-962 3b3e008ce -> 3485d8454


Created a CommonFileInputFormat abstract class that both GryoInputFormat and ScriptInputFormat now extend. It handles all vertex/edge filter construction and has helper methods for filtering the StarVertex prior to being fully loaded by the InputFormat. This is really nice as we can now tweak vertex loading to a pretty intense degree especially with GryoInputFormat (e.g. once properties are loaded, check vertex filter and thus, don't even deserialize the edges). How it is right now, the full Vertex is materialized, then validated before the InputFormat will nextKeyValue().


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3485d845
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3485d845
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3485d845

Branch: refs/heads/TINKERPOP-962
Commit: 3485d8454855938fd7c0c24d5c3f9c3eb6ab308a
Parents: 3b3e008
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Feb 1 14:45:48 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Feb 1 14:45:48 2016 -0700

----------------------------------------------------------------------
 .../process/computer/GraphComputerTest.java     |  1 +
 .../tinkerpop/gremlin/hadoop/Constants.java     |  3 +
 .../structure/io/CommonFileInputFormat.java     | 89 ++++++++++++++++++++
 .../structure/io/gryo/GryoInputFormat.java      |  8 +-
 .../structure/io/gryo/GryoRecordReader.java     | 21 ++++-
 .../structure/io/script/ScriptInputFormat.java  | 16 ++--
 .../structure/io/script/ScriptRecordReader.java | 11 ++-
 .../process/computer/SparkGraphComputer.java    | 12 ++-
 .../spark/structure/io/InputFormatRDD.java      | 15 ++++
 9 files changed, 154 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3485d845/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 361e402..12c1bf3 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3485d845/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index aa0bca5..e404570 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -38,6 +38,9 @@ public final class Constants {
     public static final String GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES = "gremlin.hadoop.graphOutputFormat.hasEdges";
     public static final String GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES = "gremlin.hadoop.graphInputFormat.hasEdges";
 
+    public static final String GREMLIN_HADOOP_VERTEX_FILTER = "gremlin.hadoop.vertexFilter";
+    public static final String GREMLIN_HADOOP_EDGE_FILTER = "gremlin.hadoop.edgeFilter";
+
     public static final String GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE = "gremlin.hadoop.jarsInDistributedCache";
     public static final String HIDDEN_G = Graph.Hidden.hide("g");
     public static final String GREMLIN_HADOOP_JOB_PREFIX = "HadoopGremlin: ";

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3485d845/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
new file mode 100644
index 0000000..679fd68
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class CommonFileInputFormat extends FileInputFormat<NullWritable, VertexWritable> implements HadoopPoolsConfigurable, GraphFilterAware {
+
+    protected Traversal.Admin<Vertex, Vertex> vertexFilter = null;
+    protected Traversal.Admin<Edge, Edge> edgeFilter = null;
+    private boolean filtersLoader = false;
+
+    protected void loadVertexAndEdgeFilters(final TaskAttemptContext context) {
+        if (!this.filtersLoader) {
+            if (context.getConfiguration().get(Constants.GREMLIN_HADOOP_VERTEX_FILTER, null) != null)
+                this.vertexFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(context.getConfiguration()), Constants.GREMLIN_HADOOP_VERTEX_FILTER);
+            if (context.getConfiguration().get(Constants.GREMLIN_HADOOP_EDGE_FILTER, null) != null)
+                this.edgeFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(context.getConfiguration()), Constants.GREMLIN_HADOOP_EDGE_FILTER);
+            this.filtersLoader = true;
+        }
+    }
+
+    @Override
+    protected boolean isSplitable(final JobContext context, final Path file) {
+        return null == new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    }
+
+    @Override
+    public void setVertexFilter(final Traversal<Vertex, Vertex> vertexFilter) {
+        // do nothing. loaded through configuration.
+    }
+
+    @Override
+    public void setEdgeFilter(final Traversal<Edge, Edge> edgeFilter) {
+        // do nothing. loaded through configuration.
+    }
+
+    public static final Vertex applyVertexAndEdgeFilters(final Vertex vertex, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
+        if (null == vertex)
+            return null;
+        else if (vertexFilter == null || TraversalUtil.test(vertex, vertexFilter)) {
+            if (edgeFilter != null) {
+                final Iterator<Edge> edgeIterator = vertex.edges(Direction.BOTH);
+                while (edgeIterator.hasNext()) {
+                    if (!TraversalUtil.test(edgeIterator.next(), edgeFilter))
+                        edgeIterator.remove();
+                }
+            }
+            return vertex;
+        } else {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3485d845/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
index 7a53a39..ebcd03c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
@@ -22,8 +22,7 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolsConfigurable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 
 import java.io.IOException;
@@ -31,11 +30,12 @@ import java.io.IOException;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class GryoInputFormat extends FileInputFormat<NullWritable, VertexWritable> implements HadoopPoolsConfigurable {
+public final class GryoInputFormat extends CommonFileInputFormat {
 
     @Override
     public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
-        final RecordReader<NullWritable, VertexWritable> reader = new GryoRecordReader();
+        super.loadVertexAndEdgeFilters(context);
+        final RecordReader<NullWritable, VertexWritable> reader = new GryoRecordReader(this.vertexFilter, this.edgeFilter);
         reader.initialize(split, context);
         return reader;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3485d845/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
index 8f3886d..ad4f936 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
@@ -27,8 +27,12 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.VertexTerminator;
@@ -54,8 +58,12 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
 
     private long currentLength = 0;
     private long splitLength;
+    private final Traversal.Admin<Vertex, Vertex> vertexFilter;
+    private final Traversal.Admin<Edge, Edge> edgeFilter;
 
-    public GryoRecordReader() {
+    public GryoRecordReader(final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
+        this.vertexFilter = null == vertexFilter ? null : vertexFilter.clone();
+        this.edgeFilter = null == edgeFilter ? null : edgeFilter.clone();
     }
 
     @Override
@@ -123,8 +131,15 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
             terminatorLocation = ((byte) currentByte) == TERMINATOR[terminatorLocation] ? terminatorLocation + 1 : 0;
             if (terminatorLocation >= TERMINATOR.length) {
                 try (InputStream in = new ByteArrayInputStream(output.toByteArray())) {
-                    this.vertexWritable.set(this.gryoReader.readVertex(in, Attachable::get)); // I know how GryoReader works, so I'm cheating here
-                    return true;
+                    final Vertex vertex = CommonFileInputFormat.applyVertexAndEdgeFilters(this.gryoReader.readVertex(in, Attachable::get), this.vertexFilter, this.edgeFilter); // I know how GryoReader works, so I'm cheating here
+                    if (null != vertex) {
+                        this.vertexWritable.set(vertex);
+                        return true;
+                    } else {
+                        currentVertexLength = 0;
+                        terminatorLocation = 0;
+                        output.reset();
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3485d845/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
index f6d6c4c..ef4b59a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
@@ -25,8 +25,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolsConfigurable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 
 import java.io.IOException;
@@ -35,19 +34,14 @@ import java.io.IOException;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  * @author Daniel Kuppitz (http://gremlin.guru)
  */
-public final class ScriptInputFormat extends FileInputFormat<NullWritable, VertexWritable> implements HadoopPoolsConfigurable {
+public final class ScriptInputFormat extends CommonFileInputFormat {
 
     @Override
-    public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context)
-            throws IOException, InterruptedException {
-
-        RecordReader<NullWritable, VertexWritable> reader = new ScriptRecordReader();
+    public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
+        super.loadVertexAndEdgeFilters(context);
+        RecordReader<NullWritable, VertexWritable> reader = new ScriptRecordReader(this.vertexFilter, this.edgeFilter);
         reader.initialize(split, context);
         return reader;
     }
 
-    @Override
-    protected boolean isSplitable(final JobContext context, final Path file) {
-        return null == new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3485d845/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
index 4cc1602..44d60b4 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
@@ -29,7 +29,9 @@ import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
 import org.apache.tinkerpop.gremlin.groovy.CompilerCustomizerProvider;
 import org.apache.tinkerpop.gremlin.groovy.DefaultImportCustomizerProvider;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -57,8 +59,13 @@ public final class ScriptRecordReader extends RecordReader<NullWritable, VertexW
     private final LineRecordReader lineRecordReader;
     private ScriptEngine engine;
 
-    public ScriptRecordReader() {
+    private final Traversal.Admin<Vertex, Vertex> vertexFilter;
+    private final Traversal.Admin<Edge, Edge> edgeFilter;
+
+    public ScriptRecordReader(final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
         this.lineRecordReader = new LineRecordReader();
+        this.vertexFilter = null == vertexFilter ? null : vertexFilter.clone();
+        this.edgeFilter = null == edgeFilter ? null : edgeFilter.clone();
     }
 
     @Override
@@ -83,7 +90,7 @@ public final class ScriptRecordReader extends RecordReader<NullWritable, VertexW
                 final Bindings bindings = this.engine.createBindings();
                 bindings.put(LINE, this.lineRecordReader.getCurrentValue().toString());
                 bindings.put(FACTORY, new ScriptElementFactory());
-                final Vertex vertex = (Vertex) engine.eval(READ_CALL, bindings);
+                final Vertex vertex = CommonFileInputFormat.applyVertexAndEdgeFilters((Vertex) engine.eval(READ_CALL, bindings), this.vertexFilter, this.edgeFilter);
                 if (vertex != null) {
                     this.vertexWritable.set(vertex);
                     return true;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3485d845/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 75e7d6c..30764b4 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -144,19 +144,27 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, Object.class));
             final InputRDD inputRDD;
             final OutputRDD outputRDD;
+            final boolean filtered;
             try {
                 inputRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class).newInstance();
                 outputRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class).newInstance();
-                if (inputRDD instanceof GraphFilterAware) { // if the input class can filter on load, then set the filters
+                // if the input class can filter on load, then set the filters
+                if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class))) {
+                    InputFormatRDD.storeVertexAndEdgeFilters(apacheConfiguration, hadoopConfiguration, this.vertexFilter, this.edgeFilter);
+                    filtered = false;
+                } else if (inputRDD instanceof GraphFilterAware) {
                     if (null != this.vertexFilter)
                         ((GraphFilterAware) inputRDD).setVertexFilter(this.vertexFilter);
                     if (null != edgeFilter)
                         ((GraphFilterAware) inputRDD).setEdgeFilter(this.edgeFilter);
+                    filtered = false;
+                } else {
+                    filtered = true;
                 }
             } catch (final InstantiationException | IllegalAccessException e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }
-            final boolean filtered = (this.vertexFilter != null || this.edgeFilter != null) && !(inputRDD instanceof GraphFilterAware);
+
             SparkMemory memory = null;
             // delete output location
             final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3485d845/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
index 57d7080..7e7d1f7 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
@@ -30,6 +30,10 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import scala.Tuple2;
 
 /**
@@ -58,4 +62,15 @@ public final class InputFormatRDD implements InputRDD {
                 ObjectWritable.class)
                 .mapToPair(tuple -> new Tuple2<>((K) ((Tuple2<ObjectWritable, ObjectWritable>) tuple)._1().get(), (V) ((Tuple2<ObjectWritable, ObjectWritable>) tuple)._2().get()));
     }
+
+    public static void storeVertexAndEdgeFilters(final Configuration apacheConfiguration, final org.apache.hadoop.conf.Configuration hadoopConfiguration, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
+        if (null != vertexFilter) {
+            VertexProgramHelper.serialize(vertexFilter, apacheConfiguration, Constants.GREMLIN_HADOOP_VERTEX_FILTER);
+            hadoopConfiguration.set(Constants.GREMLIN_HADOOP_VERTEX_FILTER, apacheConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_FILTER));
+        }
+        if (null != edgeFilter) {
+            VertexProgramHelper.serialize(edgeFilter, apacheConfiguration, Constants.GREMLIN_HADOOP_EDGE_FILTER);
+            hadoopConfiguration.set(Constants.GREMLIN_HADOOP_VERTEX_FILTER, apacheConfiguration.getString(Constants.GREMLIN_HADOOP_EDGE_FILTER));
+        }
+    }
 }