You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/06/18 04:35:44 UTC

git commit: Revert "SPARK-2038: rename "conf" parameters in the saveAsHadoop functions"

Repository: spark
Updated Branches:
  refs/heads/master d2f4f30b1 -> 9e4b4bd08


Revert "SPARK-2038: rename "conf" parameters in the saveAsHadoop functions"

This reverts commit 443f5e1bbcf9ec55e5ce6e4f738a002a47818100.

This commit unfortunately would break source compatibility if users have named
the hadoopConf parameter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e4b4bd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e4b4bd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e4b4bd0

Branch: refs/heads/master
Commit: 9e4b4bd0837cfc4ef1af1edcbc56290821e49e92
Parents: d2f4f30
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Jun 17 19:34:17 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jun 17 19:34:17 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 49 ++++++++++----------
 1 file changed, 24 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e4b4bd0/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index bff77b4..fe36c80 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -719,9 +719,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-      hadoopConf: Configuration = self.context.hadoopConfiguration)
+      conf: Configuration = self.context.hadoopConfiguration)
   {
-    val job = new NewAPIHadoopJob(hadoopConf)
+    val job = new NewAPIHadoopJob(conf)
     job.setOutputKeyClass(keyClass)
     job.setOutputValueClass(valueClass)
     job.setOutputFormatClass(outputFormatClass)
@@ -752,25 +752,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      hadoopConf: JobConf = new JobConf(self.context.hadoopConfiguration),
+      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
       codec: Option[Class[_ <: CompressionCodec]] = None) {
-    hadoopConf.setOutputKeyClass(keyClass)
-    hadoopConf.setOutputValueClass(valueClass)
+    conf.setOutputKeyClass(keyClass)
+    conf.setOutputValueClass(valueClass)
     // Doesn't work in Scala 2.9 due to what may be a generics bug
     // TODO: Should we uncomment this for Scala 2.10?
     // conf.setOutputFormat(outputFormatClass)
-    hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
+    conf.set("mapred.output.format.class", outputFormatClass.getName)
     for (c <- codec) {
-      hadoopConf.setCompressMapOutput(true)
-      hadoopConf.set("mapred.output.compress", "true")
-      hadoopConf.setMapOutputCompressorClass(c)
-      hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
-      hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
+      conf.setCompressMapOutput(true)
+      conf.set("mapred.output.compress", "true")
+      conf.setMapOutputCompressorClass(c)
+      conf.set("mapred.output.compression.codec", c.getCanonicalName)
+      conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
     }
-    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
-    FileOutputFormat.setOutputPath(hadoopConf,
-      SparkHadoopWriter.createPathFromString(path, hadoopConf))
-    saveAsHadoopDataset(hadoopConf)
+    conf.setOutputCommitter(classOf[FileOutputCommitter])
+    FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
+    saveAsHadoopDataset(conf)
   }
 
   /**
@@ -779,8 +778,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * output paths required (e.g. a table name to write to) in the same way as it would be
    * configured for a Hadoop MapReduce job.
    */
-  def saveAsNewAPIHadoopDataset(hadoopConf: Configuration) {
-    val job = new NewAPIHadoopJob(hadoopConf)
+  def saveAsNewAPIHadoopDataset(conf: Configuration) {
+    val job = new NewAPIHadoopJob(conf)
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     val jobtrackerID = formatter.format(new Date())
     val stageId = self.id
@@ -836,10 +835,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
    * MapReduce job.
    */
-  def saveAsHadoopDataset(hadoopConf: JobConf) {
-    val outputFormatInstance = hadoopConf.getOutputFormat
-    val keyClass = hadoopConf.getOutputKeyClass
-    val valueClass = hadoopConf.getOutputValueClass
+  def saveAsHadoopDataset(conf: JobConf) {
+    val outputFormatInstance = conf.getOutputFormat
+    val keyClass = conf.getOutputKeyClass
+    val valueClass = conf.getOutputValueClass
     if (outputFormatInstance == null) {
       throw new SparkException("Output format class not set")
     }
@@ -849,18 +848,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     if (valueClass == null) {
       throw new SparkException("Output value class not set")
     }
-    SparkHadoopUtil.get.addCredentials(hadoopConf)
+    SparkHadoopUtil.get.addCredentials(conf)
 
     logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
       valueClass.getSimpleName + ")")
 
     if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
       // FileOutputFormat ignores the filesystem parameter
-      val ignoredFs = FileSystem.get(hadoopConf)
-      hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
+      val ignoredFs = FileSystem.get(conf)
+      conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
     }
 
-    val writer = new SparkHadoopWriter(hadoopConf)
+    val writer = new SparkHadoopWriter(conf)
     writer.preSetup()
 
     def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {