You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/05 08:09:06 UTC

incubator-tinkerpop git commit: Created a GraphFilterInputFormat which, if the user provided INPUT_FORMAT is not GraphFilterAware, it will filter the vertex before sending it up the I/O stack. Thus, GraphSONInputFormat, which is currently NOT GraphFilter

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-962 ed18cd938 -> 502113545


Created a GraphFilterInputFormat which, if the user provided INPUT_FORMAT is not GraphFilterAware, it will filter the vertex before sending it up the I/O stack. Thus, GraphSONInputFormat, which is currently NOT GraphFilterAware, is wrapped by GraphFilterInputFormat.  This greatly simplifies the complexity of dealing with GraphFilters in Hadoop (Giraph vs. MapReduce -- e.g.). Along the way, I did lots of nice clean up and organiation of our various input/output formats and readers/writers. We had alot of anti-patterns that I have since cleaned.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/50211354
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/50211354
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/50211354

Branch: refs/heads/TINKERPOP-962
Commit: 5021135451605cfec4212babb721fedeb425c07f
Parents: ed18cd9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 5 00:09:04 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 5 00:09:04 2016 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   |   8 +-
 .../structure/io/GiraphVertexInputFormat.java   |  39 ++------
 .../structure/io/GiraphVertexOutputFormat.java  |  22 ++--
 .../giraph/structure/io/GiraphVertexReader.java |  29 +-----
 .../giraph/structure/io/GiraphVertexWriter.java |  11 +-
 .../computer/GraphFilterInputFormat.java        |  54 ++++++++++
 .../computer/GraphFilterRecordReader.java       | 100 +++++++++++++++++++
 .../process/computer/util/MapReduceHelper.java  |  10 +-
 .../structure/io/CommonFileInputFormat.java     |  15 ---
 .../structure/io/CommonFileOutputFormat.java    |   2 +-
 .../structure/io/ObjectWritableIterator.java    |   4 +-
 .../structure/io/VertexWritableIterator.java    |   4 +-
 .../io/graphson/GraphSONOutputFormat.java       |   2 +-
 .../structure/io/gryo/GryoInputFormat.java      |   3 +-
 .../structure/io/gryo/GryoOutputFormat.java     |   2 +-
 .../structure/io/gryo/GryoRecordReader.java     |  12 ++-
 .../structure/io/script/ScriptInputFormat.java  |   6 +-
 .../structure/io/script/ScriptOutputFormat.java |   2 +-
 .../structure/io/script/ScriptRecordReader.java |  10 +-
 19 files changed, 213 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 7e5a998..fce64a4 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -108,6 +108,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
         super.program(vertexProgram);
         this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
         final BaseConfiguration apacheConfiguration = new BaseConfiguration();
+        apacheConfiguration.setDelimiterParsingDisabled(true);
         vertexProgram.storeState(apacheConfiguration);
         ConfUtil.mergeApacheIntoHadoopConfiguration(apacheConfiguration, this.giraphConfiguration);
         this.vertexProgram.getMessageCombiner().ifPresent(combiner -> this.giraphConfiguration.setMessageCombinerClass(GiraphMessageCombiner.class));
@@ -145,7 +146,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
         this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
         try {
             // store vertex and edge filters (will propagate down to native InputFormat or else GiraphVertexInputFormat will process)
-            final Configuration apacheConfiguration = new BaseConfiguration();
+            final BaseConfiguration apacheConfiguration = new BaseConfiguration();
+            apacheConfiguration.setDelimiterParsingDisabled(true);
             GraphFilterAware.storeGraphFilter(apacheConfiguration, this.giraphConfiguration, this.graphFilter);
 
             // it is possible to run graph computer without a vertex program (and thus, only map reduce jobs if they exist)
@@ -157,8 +159,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                     if (e.getCause() instanceof NumberFormatException)
                         throw new NotSerializableException("The provided traversal is not serializable and thus, can not be distributed across the cluster");
                 }
-                // prepare the giraph vertex-centric computing job
-                final GiraphJob job = new GiraphJob(this.giraphConfiguration, Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
                 // split required workers across system (open map slots + max threads per machine = total amount of TinkerPop workers)
                 if (!this.useWorkerThreadsInConfiguration) {
                     final Cluster cluster = new Cluster(GiraphGraphComputer.this.giraphConfiguration);
@@ -174,6 +174,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                         this.giraphConfiguration.setNumComputeThreads(threadsPerMapper);
                     }
                 }
+                // prepare the giraph vertex-centric computing job
+                final GiraphJob job = new GiraphJob(this.giraphConfiguration, Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
                 // handle input paths (if any)
                 if (FileInputFormat.class.isAssignableFrom(this.giraphConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
                     FileInputFormat.setInputPaths(job.getInternalJob(), Constants.getSearchGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION), storage).get());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
index 3bb306b..5900663 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
@@ -21,18 +21,10 @@ package org.apache.tinkerpop.gremlin.giraph.structure.io;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
-import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.GraphFilterInputFormat;
 
 import java.io.IOException;
 import java.util.List;
@@ -42,11 +34,6 @@ import java.util.List;
  */
 public final class GiraphVertexInputFormat extends VertexInputFormat {
 
-    private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
-    protected GraphFilter graphFilter = new GraphFilter();
-    private boolean graphFilterLoaded = false;
-    private boolean graphFilterAware = false;
-
     @Override
     public void checkInputSpecs(final Configuration configuration) {
 
@@ -54,29 +41,17 @@ public final class GiraphVertexInputFormat extends VertexInputFormat {
 
     @Override
     public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        return this.hadoopGraphInputFormat.getSplits(context);
+        return new GraphFilterInputFormat().getSplits(context);
     }
 
     @Override
     public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
-        this.constructor(context.getConfiguration());
         try {
-            return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context), this.graphFilterAware, this.graphFilter);
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-    }
-
-    private final void constructor(final Configuration configuration) {
-        if (null == this.hadoopGraphInputFormat) {
-            this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
-            this.graphFilterAware = this.hadoopGraphInputFormat instanceof GraphFilterAware;
-            if (!this.graphFilterLoaded) {
-                if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
-                    this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
-                this.graphFilterLoaded = true;
-            }
+            final GiraphVertexReader reader = new GiraphVertexReader();
+            reader.initialize(split, context);
+            return reader;
+        } catch (final InterruptedException e) {
+            throw new IOException(e.getMessage(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
index 2d9b4ba..4689db1 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
@@ -18,17 +18,15 @@
  */
 package org.apache.tinkerpop.gremlin.giraph.structure.io;
 
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
 
 import java.io.IOException;
 
@@ -37,29 +35,21 @@ import java.io.IOException;
  */
 public final class GiraphVertexOutputFormat extends VertexOutputFormat {
 
-    private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
-
     @Override
     public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
+        return new GiraphVertexWriter();
     }
 
     @Override
     public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        this.hadoopGraphOutputFormat.checkOutputSpecs(context);
+        final Configuration configuration = context.getConfiguration();
+        ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration).checkOutputSpecs(context);
     }
 
     @Override
     public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        return this.hadoopGraphOutputFormat.getOutputCommitter(context);
+        final Configuration configuration = context.getConfiguration();
+        return ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration).getOutputCommitter(context);
     }
 
-    private final void constructor(final Configuration configuration) {
-        if (null == this.hadoopGraphOutputFormat) {
-            this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
index 450e298..5335980 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
@@ -25,12 +25,10 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.GraphFilterRecordReader;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
 
 import java.io.IOException;
-import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -38,13 +36,9 @@ import java.util.Optional;
 public final class GiraphVertexReader extends VertexReader {
 
     private RecordReader<NullWritable, VertexWritable> recordReader;
-    private final GraphFilter graphFilter;
-    private final boolean graphFilterAware;
 
-    public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader, final boolean graphFilterAware, final GraphFilter graphFilter) {
-        this.recordReader = recordReader;
-        this.graphFilterAware = graphFilterAware;
-        this.graphFilter = graphFilter.clone();
+    public GiraphVertexReader() {
+        this.recordReader = new GraphFilterRecordReader();
     }
 
     @Override
@@ -54,22 +48,7 @@ public final class GiraphVertexReader extends VertexReader {
 
     @Override
     public boolean nextVertex() throws IOException, InterruptedException {
-        if (this.graphFilterAware) {
-            return this.recordReader.nextKeyValue();
-        } else {
-            while (true) {
-                if (this.recordReader.nextKeyValue()) {
-                    final VertexWritable vertexWritable = this.recordReader.getCurrentValue();
-                    final Optional<StarGraph.StarVertex> vertex = this.graphFilter.applyGraphFilter(vertexWritable.get());
-                    if (vertex.isPresent()) {
-                        vertexWritable.set(vertex.get());
-                        return true;
-                    }
-                } else {
-                    return false;
-                }
-            }
-        }
+        return this.recordReader.nextKeyValue();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
index 43c4b0b..9bf5c13 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
@@ -20,11 +20,14 @@ package org.apache.tinkerpop.gremlin.giraph.structure.io;
 
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 
 import java.io.IOException;
@@ -33,16 +36,16 @@ import java.io.IOException;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class GiraphVertexWriter extends VertexWriter {
-    private final OutputFormat<NullWritable, VertexWritable> outputFormat;
     private RecordWriter<NullWritable, VertexWritable> recordWriter;
 
-    public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
-        this.outputFormat = outputFormat;
+    public GiraphVertexWriter() {
+
     }
 
     @Override
     public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.recordWriter = this.outputFormat.getRecordWriter(context);
+        final Configuration configuration = context.getConfiguration();
+        this.recordWriter = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration).getRecordWriter(context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterInputFormat.java
new file mode 100644
index 0000000..1d1b150
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterInputFormat.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GraphFilterInputFormat extends InputFormat<NullWritable, VertexWritable> {
+
+    @Override
+    public List<InputSplit> getSplits(final JobContext jobContext) throws IOException, InterruptedException {
+        final Configuration configuration = jobContext.getConfiguration();
+        return ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration).getSplits(jobContext);
+    }
+
+    @Override
+    public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+        final GraphFilterRecordReader recordReader = new GraphFilterRecordReader();
+        recordReader.initialize(inputSplit, taskAttemptContext);
+        return recordReader;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
new file mode 100644
index 0000000..e2114ef
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GraphFilterRecordReader extends RecordReader<NullWritable, VertexWritable> {
+
+    private GraphFilter graphFilter = null;
+    private RecordReader<NullWritable, VertexWritable> recordReader;
+
+    public GraphFilterRecordReader() {
+    }
+
+    @Override
+    public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+        final Configuration configuration = taskAttemptContext.getConfiguration();
+        final InputFormat<NullWritable, VertexWritable> inputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
+        if (!(inputFormat instanceof GraphFilterAware) && configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
+            this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+        this.recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+        this.recordReader.initialize(inputSplit, taskAttemptContext);
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (null == this.graphFilter) {
+            return this.recordReader.nextKeyValue();
+        } else {
+            while (true) {
+                if (this.recordReader.nextKeyValue()) {
+                    final VertexWritable vertexWritable = this.recordReader.getCurrentValue();
+                    final Optional<StarGraph.StarVertex> vertex = this.graphFilter.applyGraphFilter(vertexWritable.get());
+                    if (vertex.isPresent()) {
+                        vertexWritable.set(vertex.get());
+                        return true;
+                    }
+                } else {
+                    return false;
+                }
+            }
+        }
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+        return NullWritable.get();
+    }
+
+    @Override
+    public VertexWritable getCurrentValue() throws IOException, InterruptedException {
+        return this.recordReader.getCurrentValue();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return this.recordReader.getProgress();
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.recordReader.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
index 6e0cd9e..5cf5d1e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
@@ -22,7 +22,6 @@ import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -32,6 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.GraphFilterInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopCombine;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopMap;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopReduce;
@@ -72,6 +72,8 @@ public final class MapReduceHelper {
             final Optional<Comparator<?>> reduceSort = mapReduce.getReduceKeySort();
 
             newConfiguration.setClass(Constants.GREMLIN_HADOOP_MAP_REDUCE_CLASS, mapReduce.getClass(), MapReduce.class);
+            if (vertexProgramExists)
+                newConfiguration.set(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat((Class) newConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class)).getCanonicalName());
             final Job job = Job.getInstance(newConfiguration, mapReduce.toString());
             HadoopGraph.LOGGER.info(Constants.GREMLIN_HADOOP_JOB_PREFIX + mapReduce.toString());
             job.setJarByClass(HadoopGraph.class);
@@ -94,9 +96,7 @@ public final class MapReduceHelper {
             job.setMapOutputValueClass(ObjectWritable.class);
             job.setOutputKeyClass(ObjectWritable.class);
             job.setOutputValueClass(ObjectWritable.class);
-            job.setInputFormatClass(vertexProgramExists ?
-                    InputOutputHelper.getInputFormat((Class) newConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class)) :
-                    (Class) newConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class));
+            job.setInputFormatClass(GraphFilterInputFormat.class);
             job.setOutputFormatClass(SequenceFileOutputFormat.class);
             // if there is no vertex program, then grab the graph from the input location
             final Path graphPath = vertexProgramExists ?
@@ -130,7 +130,7 @@ public final class MapReduceHelper {
                 FileSystem.get(newConfiguration).delete(memoryPath, true); // delete the temporary memory path
                 memoryPath = sortedMemoryPath;
             }
-            mapReduce.addResultToMemory(memory, new ObjectWritableIterator(configuration, memoryPath));
+            mapReduce.addResultToMemory(memory, new ObjectWritableIterator(newConfiguration, memoryPath));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
index 9f7d458..e01cfab 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
@@ -23,29 +23,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
-import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public abstract class CommonFileInputFormat extends FileInputFormat<NullWritable, VertexWritable> implements HadoopPoolsConfigurable, GraphFilterAware {
 
-    protected GraphFilter graphFilter = new GraphFilter();
-    private boolean graphFilterLoaded = false;
-
-    protected void loadVertexAndEdgeFilters(final TaskAttemptContext context) {
-        if (!this.graphFilterLoaded) {
-            if (context.getConfiguration().get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
-                this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(context.getConfiguration()), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
-            this.graphFilterLoaded = true;
-        }
-    }
-
     @Override
     protected boolean isSplitable(final JobContext context, final Path file) {
         return null == new CompressionCodecFactory(context.getConfiguration()).getCodec(file);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileOutputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileOutputFormat.java
index 1b7559a..5031abe 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileOutputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileOutputFormat.java
@@ -38,7 +38,7 @@ import java.io.IOException;
  */
 public abstract class CommonFileOutputFormat extends FileOutputFormat<NullWritable, VertexWritable> implements PersistResultGraphAware {
 
-    protected DataOutputStream getDataOuputStream(final TaskAttemptContext job) throws IOException, InterruptedException {
+    protected DataOutputStream getDataOutputStream(final TaskAttemptContext job) throws IOException, InterruptedException {
         final Configuration conf = job.getConfiguration();
         boolean isCompressed = getCompressOutput(job);
         CompressionCodec codec = null;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
index 676ca07..5c90c5b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
@@ -24,11 +24,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.NoSuchElementException;
 import java.util.Queue;
 
 /**
@@ -77,7 +77,7 @@ public final class ObjectWritableIterator implements Iterator<KeyValue> {
             } else {
                 while (true) {
                     if (this.readers.isEmpty())
-                        throw FastNoSuchElementException.instance();
+                        throw new NoSuchElementException();
                     if (this.readers.peek().next(this.key, this.value)) {
                         return new KeyValue<>(this.key.get(), this.value.get());
                     } else

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
index d3e1fd0..835262f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
@@ -23,12 +23,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.NoSuchElementException;
 import java.util.Queue;
 
 /**
@@ -76,7 +76,7 @@ public final class VertexWritableIterator implements Iterator<Vertex> {
             } else {
                 while (true) {
                     if (this.readers.isEmpty())
-                        throw FastNoSuchElementException.instance();
+                        throw new NoSuchElementException();
                     if (this.readers.peek().next(this.value)) {
                         return this.value.get();
                     } else

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONOutputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONOutputFormat.java
index 30a8cfa..4698e61 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONOutputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONOutputFormat.java
@@ -34,7 +34,7 @@ public final class GraphSONOutputFormat extends CommonFileOutputFormat implement
 
     @Override
     public RecordWriter<NullWritable, VertexWritable> getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException {
-        return new GraphSONRecordWriter(getDataOuputStream(job), job.getConfiguration());
+        return new GraphSONRecordWriter(getDataOutputStream(job), job.getConfiguration());
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
index 78ece7c..d2b48ee 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
@@ -34,8 +34,7 @@ public final class GryoInputFormat extends CommonFileInputFormat {
 
     @Override
     public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
-        super.loadVertexAndEdgeFilters(context);
-        final RecordReader<NullWritable, VertexWritable> reader = new GryoRecordReader(this.graphFilter);
+        final RecordReader<NullWritable, VertexWritable> reader = new GryoRecordReader();
         reader.initialize(split, context);
         return reader;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
index 69d56ce..4f923ff 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
@@ -34,7 +34,7 @@ public final class GryoOutputFormat extends CommonFileOutputFormat implements Ha
 
     @Override
     public RecordWriter<NullWritable, VertexWritable> getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException {
-        return new GryoRecordWriter(getDataOuputStream(job), job.getConfiguration());
+        return new GryoRecordWriter(getDataOutputStream(job), job.getConfiguration());
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
index 7ff362f..d7ed46b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
@@ -27,9 +27,12 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
@@ -56,17 +59,18 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
 
     private long currentLength = 0;
     private long splitLength;
-    private final GraphFilter graphFilter;
+    private GraphFilter graphFilter = new GraphFilter();
+
+    public GryoRecordReader() {
 
-    public GryoRecordReader(final GraphFilter graphFilter) {
-        this.graphFilter = graphFilter.clone();
-        this.graphFilter.compileFilters();
     }
 
     @Override
     public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
         final FileSplit split = (FileSplit) genericSplit;
         final Configuration configuration = context.getConfiguration();
+        if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
+            this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
         HadoopPools.initialize(configuration);
         this.gryoReader = HadoopPools.getGryoPool().takeReader();
         long start = split.getStart();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
index 8b9af01..4053949 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
@@ -18,11 +18,8 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.structure.io.script;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
@@ -38,8 +35,7 @@ public final class ScriptInputFormat extends CommonFileInputFormat {
 
     @Override
     public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
-        super.loadVertexAndEdgeFilters(context);
-        RecordReader<NullWritable, VertexWritable> reader = new ScriptRecordReader(this.graphFilter);
+        RecordReader<NullWritable, VertexWritable> reader = new ScriptRecordReader();
         reader.initialize(split, context);
         return reader;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptOutputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptOutputFormat.java
index b57e43e..3e17aca 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptOutputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptOutputFormat.java
@@ -35,7 +35,7 @@ public final class ScriptOutputFormat extends CommonFileOutputFormat implements
 
     @Override
     public RecordWriter<NullWritable, VertexWritable> getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException {
-        return getRecordWriter(job, getDataOuputStream(job));
+        return getRecordWriter(job, getDataOutputStream(job));
     }
 
     public RecordWriter<NullWritable, VertexWritable> getRecordWriter(final TaskAttemptContext job, final DataOutputStream outputStream) throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
index 4b40909..69cc567 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
@@ -29,8 +29,11 @@ import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
 import org.apache.tinkerpop.gremlin.groovy.CompilerCustomizerProvider;
 import org.apache.tinkerpop.gremlin.groovy.DefaultImportCustomizerProvider;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -59,17 +62,18 @@ public final class ScriptRecordReader extends RecordReader<NullWritable, VertexW
     private final LineRecordReader lineRecordReader;
     private ScriptEngine engine;
 
-    private final GraphFilter graphFilter;
+    private GraphFilter graphFilter = new GraphFilter();
 
-    public ScriptRecordReader(final GraphFilter graphFilter) {
+    public ScriptRecordReader() {
         this.lineRecordReader = new LineRecordReader();
-        this.graphFilter = graphFilter.clone();
     }
 
     @Override
     public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
         this.lineRecordReader.initialize(genericSplit, context);
         final Configuration configuration = context.getConfiguration();
+        if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
+            this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
         this.engine = new GremlinGroovyScriptEngine((CompilerCustomizerProvider) new DefaultImportCustomizerProvider());
         //this.engine = ScriptEngineCache.get(configuration.get(SCRIPT_ENGINE, ScriptEngineCache.DEFAULT_SCRIPT_ENGINE));
         final FileSystem fs = FileSystem.get(configuration);