You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/03/18 19:06:24 UTC

git commit: SPARK-1102: Create a saveAsNewAPIHadoopDataset method

Repository: spark
Updated Branches:
  refs/heads/master e7423d404 -> 2fa26ec02


SPARK-1102: Create a saveAsNewAPIHadoopDataset method

https://spark-project.atlassian.net/browse/SPARK-1102

Create a saveAsNewAPIHadoopDataset method

By @mateiz: "Right now RDDs can only be saved as files using the new Hadoop API, not as "datasets" with no filename and just a JobConf. See http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ for an example of how you have to give a bogus filename. For the old Hadoop API, we have saveAsHadoopDataset."

Author: CodingCat <zh...@gmail.com>

Closes #12 from CodingCat/SPARK-1102 and squashes the following commits:

6ba0c83 [CodingCat] add test cases for saveAsHadoopDataSet (new&old API)
a8d11ba [CodingCat] style fix.........
95a6929 [CodingCat] code clean
7643c88 [CodingCat] change the parameter type back to Configuration
a8583ee [CodingCat] Create a saveAsNewAPIHadoopDataset method


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2fa26ec0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2fa26ec0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2fa26ec0

Branch: refs/heads/master
Commit: 2fa26ec02fc2251102f89bb67523419fd7dd3757
Parents: e7423d4
Author: CodingCat <zh...@gmail.com>
Authored: Tue Mar 18 11:06:18 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Tue Mar 18 11:06:18 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaPairRDD.scala |  10 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 104 +++++++++++--------
 .../test/scala/org/apache/spark/FileSuite.scala |  39 +++++--
 3 files changed, 100 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2fa26ec0/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 0ff428c..9596dba 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -26,7 +26,7 @@ import com.google.common.base.Optional
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job}
 
 import org.apache.spark.{HashPartitioner, Partitioner}
 import org.apache.spark.Partitioner._
@@ -558,6 +558,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
     rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
   }
 
+  /**
+   * Output the RDD to any Hadoop-supported storage system, using
+   * a Configuration object for that storage system.
+   */
+  def saveAsNewAPIHadoopDataset(conf: Configuration) {
+    rdd.saveAsNewAPIHadoopDataset(conf)
+  }
+
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
       path: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/2fa26ec0/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index b0d322f..447deaf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -30,11 +30,11 @@ import scala.reflect.ClassTag
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog
 import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
 
 // SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
@@ -603,50 +603,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     val job = new NewAPIHadoopJob(conf)
     job.setOutputKeyClass(keyClass)
     job.setOutputValueClass(valueClass)
-
-    val wrappedConf = new SerializableWritable(job.getConfiguration)
-    val outpath = new Path(path)
-    NewFileOutputFormat.setOutputPath(job, outpath)
-    val jobFormat = outputFormatClass.newInstance
-    jobFormat.checkOutputSpecs(job)
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    val jobtrackerID = formatter.format(new Date())
-    val stageId = self.id
-    def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
-      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
-      // around by taking a mod. We expect that no task will be attempted 2 billion times.
-      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
-      /* "reduce task" <split #> <attempt # = spark task #> */
-      val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
-        attemptNumber)
-      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
-      val format = outputFormatClass.newInstance
-      format match {
-        case c: Configurable => c.setConf(wrappedConf.value)
-        case _ => ()
-      }
-      val committer = format.getOutputCommitter(hadoopContext)
-      committer.setupTask(hadoopContext)
-      val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
-      while (iter.hasNext) {
-        val (k, v) = iter.next()
-        writer.write(k, v)
-      }
-      writer.close(hadoopContext)
-      committer.commitTask(hadoopContext)
-      return 1
-    }
-
-    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
-     * however we're only going to use this local OutputCommitter for
-     * setupJob/commitJob, so we just use a dummy "map" task.
-     */
-    val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
-    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
-    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
-    jobCommitter.setupJob(jobTaskContext)
-    self.context.runJob(self, writeShard _)
-    jobCommitter.commitJob(jobTaskContext)
+    job.setOutputFormatClass(outputFormatClass)
+    job.getConfiguration.set("mapred.output.dir", path)
+    saveAsNewAPIHadoopDataset(job.getConfiguration)
   }
 
   /**
@@ -693,6 +652,59 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   }
 
   /**
+   * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
+   * Configuration object for that storage system. The Conf should set an OutputFormat and any
+   * output paths required (e.g. a table name to write to) in the same way as it would be
+   * configured for a Hadoop MapReduce job.
+   */
+  def saveAsNewAPIHadoopDataset(conf: Configuration) {
+    val job = new NewAPIHadoopJob(conf)
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    val jobtrackerID = formatter.format(new Date())
+    val stageId = self.id
+    val wrappedConf = new SerializableWritable(job.getConfiguration)
+    val outfmt = job.getOutputFormatClass
+    val jobFormat = outfmt.newInstance
+
+    if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
+      // FileOutputFormat ignores the filesystem parameter
+      jobFormat.checkOutputSpecs(job)
+    }
+
+    def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+      // around by taking a mod. We expect that no task will be attempted 2 billion times.
+      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+      /* "reduce task" <split #> <attempt # = spark task #> */
+      val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
+        attemptNumber)
+      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
+      val format = outfmt.newInstance
+      format match {
+        case c: Configurable => c.setConf(wrappedConf.value)
+        case _ => ()
+      }
+      val committer = format.getOutputCommitter(hadoopContext)
+      committer.setupTask(hadoopContext)
+      val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
+      while (iter.hasNext) {
+        val (k, v) = iter.next()
+        writer.write(k, v)
+      }
+      writer.close(hadoopContext)
+      committer.commitTask(hadoopContext)
+      return 1
+    }
+
+    val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
+    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
+    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+    jobCommitter.setupJob(jobTaskContext)
+    self.context.runJob(self, writeShard _)
+    jobCommitter.commitJob(jobTaskContext)
+  }
+
+  /**
    * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
    * that storage system. The JobConf should set an OutputFormat and any output paths required
    * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop

http://git-wip-us.apache.org/repos/asf/spark/blob/2fa26ec0/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 7617360..01af940 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -24,11 +24,12 @@ import scala.io.Source
 import com.google.common.io.Files
 import org.apache.hadoop.io._
 import org.apache.hadoop.io.compress.DefaultCodec
-import org.apache.hadoop.mapred.FileAlreadyExistsException
+import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.apache.hadoop.mapreduce.Job
 import org.scalatest.FunSuite
 
 import org.apache.spark.SparkContext._
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
 
 class FileSuite extends FunSuite with LocalSparkContext {
 
@@ -236,7 +237,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
     val tempdir = Files.createTempDir()
     val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
     intercept[FileAlreadyExistsException] {
-      randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
+      randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
     }
   }
 
@@ -244,10 +245,36 @@ class FileSuite extends FunSuite with LocalSparkContext {
     sc = new SparkContext("local", "test")
     val tempdir = Files.createTempDir()
     val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
-    randomRDD.saveAsTextFile(tempdir.getPath + "/output")
-    assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
+    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output")
+    assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true)
     intercept[FileAlreadyExistsException] {
-      randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
+      randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
     }
   }
+
+  test ("save Hadoop Dataset through old Hadoop API") {
+    sc = new SparkContext("local", "test")
+    val tempdir = Files.createTempDir()
+    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    val job = new JobConf()
+    job.setOutputKeyClass(classOf[String])
+    job.setOutputValueClass(classOf[String])
+    job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
+    job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old")
+    randomRDD.saveAsHadoopDataset(job)
+    assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true)
+  }
+
+  test ("save Hadoop Dataset through new Hadoop API") {
+    sc = new SparkContext("local", "test")
+    val tempdir = Files.createTempDir()
+    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    val job = new Job(sc.hadoopConfiguration)
+    job.setOutputKeyClass(classOf[String])
+    job.setOutputValueClass(classOf[String])
+    job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
+    job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new")
+    randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
+    assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
+  }
 }