You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/05 08:09:06 UTC
incubator-tinkerpop git commit: Created a GraphFilterInputFormat
which, if the user provided INPUT_FORMAT is not GraphFilterAware,
it will filter the vertex before sending it up the I/O stack. Thus,
GraphSONInputFormat, which is currently NOT GraphFilter
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-962 ed18cd938 -> 502113545
Created a GraphFilterInputFormat which, if the user provided INPUT_FORMAT is not GraphFilterAware, it will filter the vertex before sending it up the I/O stack. Thus, GraphSONInputFormat, which is currently NOT GraphFilterAware, is wrapped by GraphFilterInputFormat. This greatly simplifies the complexity of dealing with GraphFilters in Hadoop (Giraph vs. MapReduce -- e.g.). Along the way, I did lots of nice clean up and organiation of our various input/output formats and readers/writers. We had alot of anti-patterns that I have since cleaned.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/50211354
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/50211354
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/50211354
Branch: refs/heads/TINKERPOP-962
Commit: 5021135451605cfec4212babb721fedeb425c07f
Parents: ed18cd9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 5 00:09:04 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 5 00:09:04 2016 -0700
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 8 +-
.../structure/io/GiraphVertexInputFormat.java | 39 ++------
.../structure/io/GiraphVertexOutputFormat.java | 22 ++--
.../giraph/structure/io/GiraphVertexReader.java | 29 +-----
.../giraph/structure/io/GiraphVertexWriter.java | 11 +-
.../computer/GraphFilterInputFormat.java | 54 ++++++++++
.../computer/GraphFilterRecordReader.java | 100 +++++++++++++++++++
.../process/computer/util/MapReduceHelper.java | 10 +-
.../structure/io/CommonFileInputFormat.java | 15 ---
.../structure/io/CommonFileOutputFormat.java | 2 +-
.../structure/io/ObjectWritableIterator.java | 4 +-
.../structure/io/VertexWritableIterator.java | 4 +-
.../io/graphson/GraphSONOutputFormat.java | 2 +-
.../structure/io/gryo/GryoInputFormat.java | 3 +-
.../structure/io/gryo/GryoOutputFormat.java | 2 +-
.../structure/io/gryo/GryoRecordReader.java | 12 ++-
.../structure/io/script/ScriptInputFormat.java | 6 +-
.../structure/io/script/ScriptOutputFormat.java | 2 +-
.../structure/io/script/ScriptRecordReader.java | 10 +-
19 files changed, 213 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 7e5a998..fce64a4 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -108,6 +108,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
super.program(vertexProgram);
this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
final BaseConfiguration apacheConfiguration = new BaseConfiguration();
+ apacheConfiguration.setDelimiterParsingDisabled(true);
vertexProgram.storeState(apacheConfiguration);
ConfUtil.mergeApacheIntoHadoopConfiguration(apacheConfiguration, this.giraphConfiguration);
this.vertexProgram.getMessageCombiner().ifPresent(combiner -> this.giraphConfiguration.setMessageCombinerClass(GiraphMessageCombiner.class));
@@ -145,7 +146,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
try {
// store vertex and edge filters (will propagate down to native InputFormat or else GiraphVertexInputFormat will process)
- final Configuration apacheConfiguration = new BaseConfiguration();
+ final BaseConfiguration apacheConfiguration = new BaseConfiguration();
+ apacheConfiguration.setDelimiterParsingDisabled(true);
GraphFilterAware.storeGraphFilter(apacheConfiguration, this.giraphConfiguration, this.graphFilter);
// it is possible to run graph computer without a vertex program (and thus, only map reduce jobs if they exist)
@@ -157,8 +159,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
if (e.getCause() instanceof NumberFormatException)
throw new NotSerializableException("The provided traversal is not serializable and thus, can not be distributed across the cluster");
}
- // prepare the giraph vertex-centric computing job
- final GiraphJob job = new GiraphJob(this.giraphConfiguration, Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
// split required workers across system (open map slots + max threads per machine = total amount of TinkerPop workers)
if (!this.useWorkerThreadsInConfiguration) {
final Cluster cluster = new Cluster(GiraphGraphComputer.this.giraphConfiguration);
@@ -174,6 +174,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
this.giraphConfiguration.setNumComputeThreads(threadsPerMapper);
}
}
+ // prepare the giraph vertex-centric computing job
+ final GiraphJob job = new GiraphJob(this.giraphConfiguration, Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
// handle input paths (if any)
if (FileInputFormat.class.isAssignableFrom(this.giraphConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
FileInputFormat.setInputPaths(job.getInternalJob(), Constants.getSearchGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION), storage).get());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
index 3bb306b..5900663 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
@@ -21,18 +21,10 @@ package org.apache.tinkerpop.gremlin.giraph.structure.io;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
-import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.GraphFilterInputFormat;
import java.io.IOException;
import java.util.List;
@@ -42,11 +34,6 @@ import java.util.List;
*/
public final class GiraphVertexInputFormat extends VertexInputFormat {
- private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
- protected GraphFilter graphFilter = new GraphFilter();
- private boolean graphFilterLoaded = false;
- private boolean graphFilterAware = false;
-
@Override
public void checkInputSpecs(final Configuration configuration) {
@@ -54,29 +41,17 @@ public final class GiraphVertexInputFormat extends VertexInputFormat {
@Override
public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- return this.hadoopGraphInputFormat.getSplits(context);
+ return new GraphFilterInputFormat().getSplits(context);
}
@Override
public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
- this.constructor(context.getConfiguration());
try {
- return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context), this.graphFilterAware, this.graphFilter);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- private final void constructor(final Configuration configuration) {
- if (null == this.hadoopGraphInputFormat) {
- this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
- this.graphFilterAware = this.hadoopGraphInputFormat instanceof GraphFilterAware;
- if (!this.graphFilterLoaded) {
- if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
- this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
- this.graphFilterLoaded = true;
- }
+ final GiraphVertexReader reader = new GiraphVertexReader();
+ reader.initialize(split, context);
+ return reader;
+ } catch (final InterruptedException e) {
+ throw new IOException(e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
index 2d9b4ba..4689db1 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
@@ -18,17 +18,15 @@
*/
package org.apache.tinkerpop.gremlin.giraph.structure.io;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.VertexWriter;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
import java.io.IOException;
@@ -37,29 +35,21 @@ import java.io.IOException;
*/
public final class GiraphVertexOutputFormat extends VertexOutputFormat {
- private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
-
@Override
public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
+ return new GiraphVertexWriter();
}
@Override
public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- this.hadoopGraphOutputFormat.checkOutputSpecs(context);
+ final Configuration configuration = context.getConfiguration();
+ ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration).checkOutputSpecs(context);
}
@Override
public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- return this.hadoopGraphOutputFormat.getOutputCommitter(context);
+ final Configuration configuration = context.getConfiguration();
+ return ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration).getOutputCommitter(context);
}
- private final void constructor(final Configuration configuration) {
- if (null == this.hadoopGraphOutputFormat) {
- this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
index 450e298..5335980 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
@@ -25,12 +25,10 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.GraphFilterRecordReader;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import java.io.IOException;
-import java.util.Optional;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -38,13 +36,9 @@ import java.util.Optional;
public final class GiraphVertexReader extends VertexReader {
private RecordReader<NullWritable, VertexWritable> recordReader;
- private final GraphFilter graphFilter;
- private final boolean graphFilterAware;
- public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader, final boolean graphFilterAware, final GraphFilter graphFilter) {
- this.recordReader = recordReader;
- this.graphFilterAware = graphFilterAware;
- this.graphFilter = graphFilter.clone();
+ public GiraphVertexReader() {
+ this.recordReader = new GraphFilterRecordReader();
}
@Override
@@ -54,22 +48,7 @@ public final class GiraphVertexReader extends VertexReader {
@Override
public boolean nextVertex() throws IOException, InterruptedException {
- if (this.graphFilterAware) {
- return this.recordReader.nextKeyValue();
- } else {
- while (true) {
- if (this.recordReader.nextKeyValue()) {
- final VertexWritable vertexWritable = this.recordReader.getCurrentValue();
- final Optional<StarGraph.StarVertex> vertex = this.graphFilter.applyGraphFilter(vertexWritable.get());
- if (vertex.isPresent()) {
- vertexWritable.set(vertex.get());
- return true;
- }
- } else {
- return false;
- }
- }
- }
+ return this.recordReader.nextKeyValue();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
index 43c4b0b..9bf5c13 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
@@ -20,11 +20,14 @@ package org.apache.tinkerpop.gremlin.giraph.structure.io;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import java.io.IOException;
@@ -33,16 +36,16 @@ import java.io.IOException;
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class GiraphVertexWriter extends VertexWriter {
- private final OutputFormat<NullWritable, VertexWritable> outputFormat;
private RecordWriter<NullWritable, VertexWritable> recordWriter;
- public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
- this.outputFormat = outputFormat;
+ public GiraphVertexWriter() {
+
}
@Override
public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.recordWriter = this.outputFormat.getRecordWriter(context);
+ final Configuration configuration = context.getConfiguration();
+ this.recordWriter = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration).getRecordWriter(context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterInputFormat.java
new file mode 100644
index 0000000..1d1b150
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterInputFormat.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GraphFilterInputFormat extends InputFormat<NullWritable, VertexWritable> {
+
+ @Override
+ public List<InputSplit> getSplits(final JobContext jobContext) throws IOException, InterruptedException {
+ final Configuration configuration = jobContext.getConfiguration();
+ return ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration).getSplits(jobContext);
+ }
+
+ @Override
+ public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ final GraphFilterRecordReader recordReader = new GraphFilterRecordReader();
+ recordReader.initialize(inputSplit, taskAttemptContext);
+ return recordReader;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
new file mode 100644
index 0000000..e2114ef
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/GraphFilterRecordReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GraphFilterRecordReader extends RecordReader<NullWritable, VertexWritable> {
+
+ private GraphFilter graphFilter = null;
+ private RecordReader<NullWritable, VertexWritable> recordReader;
+
+ public GraphFilterRecordReader() {
+ }
+
+ @Override
+ public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ final Configuration configuration = taskAttemptContext.getConfiguration();
+ final InputFormat<NullWritable, VertexWritable> inputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
+ if (!(inputFormat instanceof GraphFilterAware) && configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
+ this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+ this.recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+ this.recordReader.initialize(inputSplit, taskAttemptContext);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (null == this.graphFilter) {
+ return this.recordReader.nextKeyValue();
+ } else {
+ while (true) {
+ if (this.recordReader.nextKeyValue()) {
+ final VertexWritable vertexWritable = this.recordReader.getCurrentValue();
+ final Optional<StarGraph.StarVertex> vertex = this.graphFilter.applyGraphFilter(vertexWritable.get());
+ if (vertex.isPresent()) {
+ vertexWritable.set(vertex.get());
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public VertexWritable getCurrentValue() throws IOException, InterruptedException {
+ return this.recordReader.getCurrentValue();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return this.recordReader.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.recordReader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
index 6e0cd9e..5cf5d1e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
@@ -22,7 +22,6 @@ import org.apache.commons.configuration.BaseConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -32,6 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.GraphFilterInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopCombine;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopMap;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopReduce;
@@ -72,6 +72,8 @@ public final class MapReduceHelper {
final Optional<Comparator<?>> reduceSort = mapReduce.getReduceKeySort();
newConfiguration.setClass(Constants.GREMLIN_HADOOP_MAP_REDUCE_CLASS, mapReduce.getClass(), MapReduce.class);
+ if (vertexProgramExists)
+ newConfiguration.set(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat((Class) newConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class)).getCanonicalName());
final Job job = Job.getInstance(newConfiguration, mapReduce.toString());
HadoopGraph.LOGGER.info(Constants.GREMLIN_HADOOP_JOB_PREFIX + mapReduce.toString());
job.setJarByClass(HadoopGraph.class);
@@ -94,9 +96,7 @@ public final class MapReduceHelper {
job.setMapOutputValueClass(ObjectWritable.class);
job.setOutputKeyClass(ObjectWritable.class);
job.setOutputValueClass(ObjectWritable.class);
- job.setInputFormatClass(vertexProgramExists ?
- InputOutputHelper.getInputFormat((Class) newConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class)) :
- (Class) newConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class));
+ job.setInputFormatClass(GraphFilterInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// if there is no vertex program, then grab the graph from the input location
final Path graphPath = vertexProgramExists ?
@@ -130,7 +130,7 @@ public final class MapReduceHelper {
FileSystem.get(newConfiguration).delete(memoryPath, true); // delete the temporary memory path
memoryPath = sortedMemoryPath;
}
- mapReduce.addResultToMemory(memory, new ObjectWritableIterator(configuration, memoryPath));
+ mapReduce.addResultToMemory(memory, new ObjectWritableIterator(newConfiguration, memoryPath));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
index 9f7d458..e01cfab 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
@@ -23,29 +23,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
-import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public abstract class CommonFileInputFormat extends FileInputFormat<NullWritable, VertexWritable> implements HadoopPoolsConfigurable, GraphFilterAware {
- protected GraphFilter graphFilter = new GraphFilter();
- private boolean graphFilterLoaded = false;
-
- protected void loadVertexAndEdgeFilters(final TaskAttemptContext context) {
- if (!this.graphFilterLoaded) {
- if (context.getConfiguration().get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
- this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(context.getConfiguration()), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
- this.graphFilterLoaded = true;
- }
- }
-
@Override
protected boolean isSplitable(final JobContext context, final Path file) {
return null == new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileOutputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileOutputFormat.java
index 1b7559a..5031abe 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileOutputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileOutputFormat.java
@@ -38,7 +38,7 @@ import java.io.IOException;
*/
public abstract class CommonFileOutputFormat extends FileOutputFormat<NullWritable, VertexWritable> implements PersistResultGraphAware {
- protected DataOutputStream getDataOuputStream(final TaskAttemptContext job) throws IOException, InterruptedException {
+ protected DataOutputStream getDataOutputStream(final TaskAttemptContext job) throws IOException, InterruptedException {
final Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
CompressionCodec codec = null;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
index 676ca07..5c90c5b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
@@ -24,11 +24,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.NoSuchElementException;
import java.util.Queue;
/**
@@ -77,7 +77,7 @@ public final class ObjectWritableIterator implements Iterator<KeyValue> {
} else {
while (true) {
if (this.readers.isEmpty())
- throw FastNoSuchElementException.instance();
+ throw new NoSuchElementException();
if (this.readers.peek().next(this.key, this.value)) {
return new KeyValue<>(this.key.get(), this.value.get());
} else
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
index d3e1fd0..835262f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
@@ -23,12 +23,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.NoSuchElementException;
import java.util.Queue;
/**
@@ -76,7 +76,7 @@ public final class VertexWritableIterator implements Iterator<Vertex> {
} else {
while (true) {
if (this.readers.isEmpty())
- throw FastNoSuchElementException.instance();
+ throw new NoSuchElementException();
if (this.readers.peek().next(this.value)) {
return this.value.get();
} else
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONOutputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONOutputFormat.java
index 30a8cfa..4698e61 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONOutputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONOutputFormat.java
@@ -34,7 +34,7 @@ public final class GraphSONOutputFormat extends CommonFileOutputFormat implement
@Override
public RecordWriter<NullWritable, VertexWritable> getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException {
- return new GraphSONRecordWriter(getDataOuputStream(job), job.getConfiguration());
+ return new GraphSONRecordWriter(getDataOutputStream(job), job.getConfiguration());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
index 78ece7c..d2b48ee 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
@@ -34,8 +34,7 @@ public final class GryoInputFormat extends CommonFileInputFormat {
@Override
public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
- super.loadVertexAndEdgeFilters(context);
- final RecordReader<NullWritable, VertexWritable> reader = new GryoRecordReader(this.graphFilter);
+ final RecordReader<NullWritable, VertexWritable> reader = new GryoRecordReader();
reader.initialize(split, context);
return reader;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
index 69d56ce..4f923ff 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
@@ -34,7 +34,7 @@ public final class GryoOutputFormat extends CommonFileOutputFormat implements Ha
@Override
public RecordWriter<NullWritable, VertexWritable> getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException {
- return new GryoRecordWriter(getDataOuputStream(job), job.getConfiguration());
+ return new GryoRecordWriter(getDataOutputStream(job), job.getConfiguration());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
index 7ff362f..d7ed46b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
@@ -27,9 +27,12 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
@@ -56,17 +59,18 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
private long currentLength = 0;
private long splitLength;
- private final GraphFilter graphFilter;
+ private GraphFilter graphFilter = new GraphFilter();
+
+ public GryoRecordReader() {
- public GryoRecordReader(final GraphFilter graphFilter) {
- this.graphFilter = graphFilter.clone();
- this.graphFilter.compileFilters();
}
@Override
public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
final FileSplit split = (FileSplit) genericSplit;
final Configuration configuration = context.getConfiguration();
+ if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
+ this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
HadoopPools.initialize(configuration);
this.gryoReader = HadoopPools.getGryoPool().takeReader();
long start = split.getStart();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
index 8b9af01..4053949 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
@@ -18,11 +18,8 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.structure.io.script;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
@@ -38,8 +35,7 @@ public final class ScriptInputFormat extends CommonFileInputFormat {
@Override
public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
- super.loadVertexAndEdgeFilters(context);
- RecordReader<NullWritable, VertexWritable> reader = new ScriptRecordReader(this.graphFilter);
+ RecordReader<NullWritable, VertexWritable> reader = new ScriptRecordReader();
reader.initialize(split, context);
return reader;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptOutputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptOutputFormat.java
index b57e43e..3e17aca 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptOutputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptOutputFormat.java
@@ -35,7 +35,7 @@ public final class ScriptOutputFormat extends CommonFileOutputFormat implements
@Override
public RecordWriter<NullWritable, VertexWritable> getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException {
- return getRecordWriter(job, getDataOuputStream(job));
+ return getRecordWriter(job, getDataOutputStream(job));
}
public RecordWriter<NullWritable, VertexWritable> getRecordWriter(final TaskAttemptContext job, final DataOutputStream outputStream) throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/50211354/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
index 4b40909..69cc567 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
@@ -29,8 +29,11 @@ import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.tinkerpop.gremlin.groovy.CompilerCustomizerProvider;
import org.apache.tinkerpop.gremlin.groovy.DefaultImportCustomizerProvider;
import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -59,17 +62,18 @@ public final class ScriptRecordReader extends RecordReader<NullWritable, VertexW
private final LineRecordReader lineRecordReader;
private ScriptEngine engine;
- private final GraphFilter graphFilter;
+ private GraphFilter graphFilter = new GraphFilter();
- public ScriptRecordReader(final GraphFilter graphFilter) {
+ public ScriptRecordReader() {
this.lineRecordReader = new LineRecordReader();
- this.graphFilter = graphFilter.clone();
}
@Override
public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
this.lineRecordReader.initialize(genericSplit, context);
final Configuration configuration = context.getConfiguration();
+ if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
+ this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
this.engine = new GremlinGroovyScriptEngine((CompilerCustomizerProvider) new DefaultImportCustomizerProvider());
//this.engine = ScriptEngineCache.get(configuration.get(SCRIPT_ENGINE, ScriptEngineCache.DEFAULT_SCRIPT_ENGINE));
final FileSystem fs = FileSystem.get(configuration);