You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2015/11/03 17:48:55 UTC
[08/14] incubator-tinkerpop git commit: @RussellSpitzer has schooled
me in cache()/unpersist(). I now am smart to unpersist() RDDs that are just
dangling around (especailly since we now have persistent contexts). Finally,
I did some refactoring of packag
@RussellSpitzer has schooled me in cache()/unpersist(). I now am smart to unpersist() RDDs that are just dangling around (especailly since we now have persistent contexts). Finally, I did some refactoring of packages (non-breaking) as I'm starting to see the future of what SparkGraph looks like (in other words, being able to use SparkGraph w/o HadoopGraph (back SparkGraph by an RDD). Anywho, that refactor cleaned this up for now and for the future so its good.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/cb972387
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/cb972387
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/cb972387
Branch: refs/heads/TINKERPOP3-909
Commit: cb972387bd369619a39b3ced215997703a8bb9e9
Parents: eeb0d9f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Oct 30 14:30:10 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Oct 30 14:30:10 2015 -0600
----------------------------------------------------------------------
docs/src/implementations.asciidoc | 4 +-
.../process/computer/GiraphGraphComputer.java | 8 +-
.../computer/io/GiraphVertexInputFormat.java | 70 ------
.../computer/io/GiraphVertexOutputFormat.java | 65 ------
.../process/computer/io/GiraphVertexReader.java | 67 ------
.../process/computer/io/GiraphVertexWriter.java | 57 -----
.../structure/io/GiraphVertexInputFormat.java | 70 ++++++
.../structure/io/GiraphVertexOutputFormat.java | 65 ++++++
.../giraph/structure/io/GiraphVertexReader.java | 67 ++++++
.../giraph/structure/io/GiraphVertexWriter.java | 57 +++++
.../hadoop/structure/io/InputOutputHelper.java | 22 ++
.../hadoop/structure/util/HadoopHelper.java | 50 -----
.../process/computer/SparkGraphComputer.java | 15 +-
.../process/computer/io/InputFormatRDD.java | 47 ----
.../spark/process/computer/io/InputRDD.java | 41 ----
.../process/computer/io/OutputFormatRDD.java | 49 -----
.../spark/process/computer/io/OutputRDD.java | 31 ---
.../process/computer/io/PersistedInputRDD.java | 60 -----
.../process/computer/io/PersistedOutputRDD.java | 41 ----
.../spark/structure/io/InputFormatRDD.java | 47 ++++
.../spark/structure/io/InputOutputHelper.java | 81 +++++++
.../gremlin/spark/structure/io/InputRDD.java | 41 ++++
.../spark/structure/io/OutputFormatRDD.java | 49 +++++
.../gremlin/spark/structure/io/OutputRDD.java | 31 +++
.../spark/structure/io/PersistedInputRDD.java | 60 +++++
.../spark/structure/io/PersistedOutputRDD.java | 41 ++++
.../process/computer/io/ExampleInputRDD.java | 47 ----
.../process/computer/io/ExampleOutputRDD.java | 45 ----
.../process/computer/io/InputOutputRDDTest.java | 59 -----
.../spark/process/computer/io/InputRDDTest.java | 54 -----
.../process/computer/io/OutputRDDTest.java | 62 ------
.../io/PersistedInputOutputRDDTest.java | 172 ---------------
.../spark/structure/io/ExampleInputRDD.java | 48 ++++
.../spark/structure/io/ExampleOutputRDD.java | 46 ++++
.../spark/structure/io/InputOutputRDDTest.java | 59 +++++
.../spark/structure/io/InputRDDTest.java | 54 +++++
.../spark/structure/io/OutputRDDTest.java | 62 ++++++
.../io/PersistedInputOutputRDDTest.java | 217 +++++++++++++++++++
38 files changed, 1131 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/docs/src/implementations.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/implementations.asciidoc b/docs/src/implementations.asciidoc
index 8110c5f..7792665 100644
--- a/docs/src/implementations.asciidoc
+++ b/docs/src/implementations.asciidoc
@@ -1001,8 +1001,8 @@ spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerial
####################################
# SparkGraphComputer Configuration #
####################################
-gremlin.spark.graphInputRDD=org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDDFormat
-gremlin.spark.graphOutputRDD=org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDDFormat
+gremlin.spark.graphInputRDD=org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat
+gremlin.spark.graphOutputRDD=org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDDFormat
gremlin.spark.persistContext=true
#####################################
# GiraphGraphComputer Configuration #
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 4c33220..0389750 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -34,17 +34,17 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.io.GiraphVertexInputFormat;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.io.GiraphVertexOutputFormat;
+import org.apache.tinkerpop.gremlin.giraph.structure.io.GiraphVertexInputFormat;
+import org.apache.tinkerpop.gremlin.giraph.structure.io.GiraphVertexOutputFormat;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
@@ -126,7 +126,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
}
this.memory.setRuntime(System.currentTimeMillis() - startTime);
- return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), this.memory.asImmutable());
+ return new DefaultComputerResult(InputOutputHelper.getOutputGraph(ConfUtil.makeApacheConfiguration(this.giraphConfiguration), this.resultGraph, this.persist), this.memory.asImmutable());
});
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java
deleted file mode 100644
index e5407da..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexInputFormat extends VertexInputFormat {
-
- private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
-
- @Override
- public void checkInputSpecs(final Configuration configuration) {
-
- }
-
- @Override
- public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- return this.hadoopGraphInputFormat.getSplits(context);
- }
-
- @Override
- public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
- this.constructor(context.getConfiguration());
- try {
- return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context));
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- private final void constructor(final Configuration configuration) {
- if (null == this.hadoopGraphInputFormat) {
- this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java
deleted file mode 100644
index c1360c7..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexOutputFormat extends VertexOutputFormat {
-
- private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
-
- @Override
- public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
- }
-
- @Override
- public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- this.hadoopGraphOutputFormat.checkOutputSpecs(context);
- }
-
- @Override
- public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- return this.hadoopGraphOutputFormat.getOutputCommitter(context);
- }
-
- private final void constructor(final Configuration configuration) {
- if (null == this.hadoopGraphOutputFormat) {
- this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
deleted file mode 100644
index c8ed797..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexReader extends VertexReader {
-
- private RecordReader<NullWritable, VertexWritable> recordReader;
-
- public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
- this.recordReader = recordReader;
- }
-
- @Override
- public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
- this.recordReader.initialize(inputSplit, context);
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return this.recordReader.nextKeyValue();
- }
-
- @Override
- public Vertex getCurrentVertex() throws IOException, InterruptedException {
- return new GiraphVertex(this.recordReader.getCurrentValue());
- }
-
- @Override
- public void close() throws IOException {
- this.recordReader.close();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return this.recordReader.getProgress();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
deleted file mode 100644
index d67b736..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexWriter extends VertexWriter {
- private final OutputFormat<NullWritable, VertexWritable> outputFormat;
- private RecordWriter<NullWritable, VertexWritable> recordWriter;
-
- public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
- this.outputFormat = outputFormat;
- }
-
- @Override
- public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.recordWriter = this.outputFormat.getRecordWriter(context);
- }
-
- @Override
- public void close(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.recordWriter.close(context);
- }
-
- @Override
- public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
- this.recordWriter.write(NullWritable.get(), ((GiraphVertex) vertex).getValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
new file mode 100644
index 0000000..6b6638a
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexInputFormat extends VertexInputFormat {
+
+ private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
+
+ @Override
+ public void checkInputSpecs(final Configuration configuration) {
+
+ }
+
+ @Override
+ public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
+ this.constructor(context.getConfiguration());
+ return this.hadoopGraphInputFormat.getSplits(context);
+ }
+
+ @Override
+ public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
+ this.constructor(context.getConfiguration());
+ try {
+ return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private final void constructor(final Configuration configuration) {
+ if (null == this.hadoopGraphInputFormat) {
+ this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
new file mode 100644
index 0000000..2d9b4ba
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexOutputFormat extends VertexOutputFormat {
+
+ private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
+
+ @Override
+ public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
+ this.constructor(context.getConfiguration());
+ return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
+ }
+
+ @Override
+ public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
+ this.constructor(context.getConfiguration());
+ this.hadoopGraphOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
+ this.constructor(context.getConfiguration());
+ return this.hadoopGraphOutputFormat.getOutputCommitter(context);
+ }
+
+ private final void constructor(final Configuration configuration) {
+ if (null == this.hadoopGraphOutputFormat) {
+ this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
new file mode 100644
index 0000000..2a6ade6
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexReader extends VertexReader {
+
+ private RecordReader<NullWritable, VertexWritable> recordReader;
+
+ public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
+ this.recordReader = recordReader;
+ }
+
+ @Override
+ public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
+ this.recordReader.initialize(inputSplit, context);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return this.recordReader.nextKeyValue();
+ }
+
+ @Override
+ public Vertex getCurrentVertex() throws IOException, InterruptedException {
+ return new GiraphVertex(this.recordReader.getCurrentValue());
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.recordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return this.recordReader.getProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
new file mode 100644
index 0000000..43c4b0b
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexWriter extends VertexWriter {
+ private final OutputFormat<NullWritable, VertexWritable> outputFormat;
+ private RecordWriter<NullWritable, VertexWritable> recordWriter;
+
+ public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
+ this.outputFormat = outputFormat;
+ }
+
+ @Override
+ public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
+ this.recordWriter = this.outputFormat.getRecordWriter(context);
+ }
+
+ @Override
+ public void close(final TaskAttemptContext context) throws IOException, InterruptedException {
+ this.recordWriter.close(context);
+ }
+
+ @Override
+ public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
+ this.recordWriter.write(NullWritable.get(), ((GiraphVertex) vertex).getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
index 8a80053..04097c1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
@@ -18,15 +18,21 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptOutputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -65,4 +71,20 @@ public final class InputOutputHelper {
INPUT_TO_OUTPUT_CACHE.put(inputFormat, outputFormat);
OUTPUT_TO_INPUT_CACHE.put(outputFormat, inputFormat);
}
+
+ public static HadoopGraph getOutputGraph(final Configuration configuration, final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
+ final HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(configuration);
+ final BaseConfiguration newConfiguration = new BaseConfiguration();
+ newConfiguration.copy(hadoopConfiguration);
+ if (resultGraph.equals(GraphComputer.ResultGraph.NEW)) {
+ newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "/" + Constants.HIDDEN_G);
+ if (hadoopConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))
+ newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat(hadoopConfiguration.getGraphOutputFormat()).getCanonicalName());
+ newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "_");
+ newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, persist.equals(GraphComputer.Persist.EDGES));
+ } else {
+ newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "_");
+ }
+ return HadoopGraph.open(newConfiguration);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
deleted file mode 100644
index ebfdf91..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.util;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HadoopHelper {
-
- private HadoopHelper() {
- }
-
- public static HadoopGraph getOutputGraph(final HadoopGraph hadoopGraph, final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
- final BaseConfiguration newConfiguration = new BaseConfiguration();
- newConfiguration.copy(hadoopGraph.configuration());
- if (resultGraph.equals(GraphComputer.ResultGraph.NEW)) {
- newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "/" + Constants.HIDDEN_G);
- if (hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))
- newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat(hadoopGraph.configuration().getGraphOutputFormat()).getCanonicalName());
- newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "_");
- newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, persist.equals(GraphComputer.Persist.EDGES));
- } else {
- newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "_");
- }
- return HadoopGraph.open(newConfiguration);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 0656f5a..b25073b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -37,7 +37,6 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
@@ -45,11 +44,12 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputFormatRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
import java.io.File;
import java.io.IOException;
@@ -209,6 +209,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// write the map reduce output back to disk (memory)
SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
}
+ mapReduceGraphRDD.unpersist();
}
// unpersist the graphRDD if it will no longer be used
@@ -217,9 +218,9 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
// update runtime and return the newly computed graph
finalMemory.setRuntime(System.currentTimeMillis() - startTime);
- return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());
+ return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
} finally {
- if (sparkContext != null && !this.hadoopGraph.configuration().getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+ if (sparkContext != null && !apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
sparkContext.stop();
}
});
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
deleted file mode 100644
index adb080b..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class InputFormatRDD implements InputRDD {
-
- @Override
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
- final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
- return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
- (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
- NullWritable.class,
- VertexWritable.class)
- .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
- .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
deleted file mode 100644
index 291fcd3..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
- * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface InputRDD {
-
- /**
- * Read the graphRDD from the underlying graph system.
- * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer}.
- * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
- * @return an adjacency list representation of the underlying graph system.
- */
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
deleted file mode 100644
index 56a1297..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class OutputFormatRDD implements OutputRDD {
-
- @Override
- public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
- final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
- final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
- if (null != outputLocation) {
- // map back to a <nullwritable,vertexwritable> stream for output
- graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
- .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
- NullWritable.class,
- VertexWritable.class,
- (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
deleted file mode 100644
index 2580252..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface OutputRDD {
-
- public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
deleted file mode 100644
index ad521b3..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.rdd.RDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import scala.Tuple2;
-import scala.collection.Iterator;
-
-import java.util.Optional;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class PersistedInputRDD implements InputRDD {
-
- @Override
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
- final String inputRDDName = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, null);
- if (null == inputRDDName)
- throw new IllegalArgumentException(PersistedInputRDD.class.getSimpleName() + " requires " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " in order to retrieve the named graphRDD from the SparkContext");
- if (!PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).isPresent())
- throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + inputRDDName);
- return JavaPairRDD.fromJavaRDD((JavaRDD) PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).get().toJavaRDD());
- }
-
- public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
- final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
- getPersistentRDDs().
- toList().iterator();
- while (iterator.hasNext()) {
- final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
- if (tuple2._2().toString().contains(rddName))
- return Optional.of(tuple2._2());
- }
- return Optional.empty();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
deleted file mode 100644
index 1832ca3..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class PersistedOutputRDD implements OutputRDD {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(PersistedOutputRDD.class);
-
- @Override
- public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
- if (!configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
- LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
new file mode 100644
index 0000000..f768939
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InputFormatRDD implements InputRDD {
+
+ @Override
+ public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+ final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+ return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+ (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+ NullWritable.class,
+ VertexWritable.class)
+ .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+ .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
new file mode 100644
index 0000000..b1e1b0a
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InputOutputHelper {
+
+ private static Map<Class<? extends InputRDD>, Class<? extends OutputRDD>> INPUT_TO_OUTPUT_CACHE = new ConcurrentHashMap<>();
+ private static Map<Class<? extends OutputRDD>, Class<? extends InputRDD>> OUTPUT_TO_INPUT_CACHE = new ConcurrentHashMap<>();
+
+ static {
+ INPUT_TO_OUTPUT_CACHE.put(PersistedInputRDD.class, PersistedOutputRDD.class);
+ INPUT_TO_OUTPUT_CACHE.put(InputFormatRDD.class, OutputFormatRDD.class);
+ //
+ OUTPUT_TO_INPUT_CACHE.put(PersistedOutputRDD.class, PersistedInputRDD.class);
+ OUTPUT_TO_INPUT_CACHE.put(OutputFormatRDD.class, InputFormatRDD.class);
+ }
+
+ private InputOutputHelper() {
+
+ }
+
+ public static Class<? extends InputRDD> getInputFormat(final Class<? extends OutputRDD> outputRDD) {
+ return OUTPUT_TO_INPUT_CACHE.get(outputRDD);
+ }
+
+ public static Class<? extends OutputRDD> getOutputFormat(final Class<? extends InputRDD> inputRDD) {
+ return INPUT_TO_OUTPUT_CACHE.get(inputRDD);
+ }
+
+ public static void registerInputOutputPair(final Class<? extends InputRDD> inputRDD, final Class<? extends OutputRDD> outputRDD) {
+ INPUT_TO_OUTPUT_CACHE.put(inputRDD, outputRDD);
+ OUTPUT_TO_INPUT_CACHE.put(outputRDD, inputRDD);
+ }
+
+ public static HadoopGraph getOutputGraph(final Configuration configuration, final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
+ try {
+ final HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(configuration);
+ final BaseConfiguration newConfiguration = new BaseConfiguration();
+ newConfiguration.copy(org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper.getOutputGraph(configuration, resultGraph, persist).configuration());
+ if (resultGraph.equals(GraphComputer.ResultGraph.NEW) && hadoopConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD)) {
+ newConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputOutputHelper.getInputFormat((Class) Class.forName(hadoopConfiguration.getProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD).toString())).getCanonicalName());
+ if (newConfiguration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, "").endsWith("/" + Constants.HIDDEN_G)) { // Spark RDDs are not namespaced the same as Hadoop
+ newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, newConfiguration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).replace("/" + Constants.HIDDEN_G, ""));
+ }
+ }
+ return HadoopGraph.open(newConfiguration);
+ } catch (final ClassNotFoundException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java
new file mode 100644
index 0000000..c84c189
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
+ * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface InputRDD {
+
+ /**
+ * Read the graphRDD from the underlying graph system.
+ * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer}.
+ * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
+ * @return an adjacency list representation of the underlying graph system.
+ */
+ public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
new file mode 100644
index 0000000..cc6ed61
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class OutputFormatRDD implements OutputRDD {
+
+ @Override
+ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+ final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+ if (null != outputLocation) {
+ // map back to a <nullwritable,vertexwritable> stream for output
+ graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
+ .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
+ NullWritable.class,
+ VertexWritable.class,
+ (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java
new file mode 100644
index 0000000..c2964eb
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface OutputRDD {
+
+ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
new file mode 100644
index 0000000..e3c27ec
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import scala.Tuple2;
+import scala.collection.Iterator;
+
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PersistedInputRDD implements InputRDD {
+
+ @Override
+ public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+ final String inputRDDName = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, null);
+ if (null == inputRDDName)
+ throw new IllegalArgumentException(PersistedInputRDD.class.getSimpleName() + " requires " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " in order to retrieve the named graphRDD from the SparkContext");
+ if (!PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).isPresent())
+ throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + inputRDDName);
+ return JavaPairRDD.fromJavaRDD((JavaRDD) PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).get().toJavaRDD());
+ }
+
+ public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
+ final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
+ getPersistentRDDs().
+ toList().iterator();
+ while (iterator.hasNext()) {
+ final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
+ if (tuple2._2().toString().contains(rddName))
+ return Optional.of(tuple2._2());
+ }
+ return Optional.empty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
new file mode 100644
index 0000000..abeeaaa
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PersistedOutputRDD implements OutputRDD {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PersistedOutputRDD.class);
+
+ @Override
+ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+ if (!configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+ LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
deleted file mode 100644
index 1fb85a1..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.T;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleInputRDD implements InputRDD {
-
- @Override
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
- final List<Vertex> list = new ArrayList<>();
- list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
- list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
- list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
- list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
- return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
deleted file mode 100644
index bb38f7f..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleOutputRDD implements OutputRDD {
- @Override
- public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
- int totalAge = 0;
- final Iterator<VertexWritable> iterator = graphRDD.values().toLocalIterator();
- while (iterator.hasNext()) {
- final Vertex vertex = iterator.next().get();
- if (vertex.label().equals("person"))
- totalAge = totalAge + vertex.<Integer>value("age");
- }
- assertEquals(123, totalAge);
- }
-}