You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/06/18 04:35:44 UTC
git commit: Revert "SPARK-2038: rename "conf" parameters in the
saveAsHadoop functions"
Repository: spark
Updated Branches:
refs/heads/master d2f4f30b1 -> 9e4b4bd08
Revert "SPARK-2038: rename "conf" parameters in the saveAsHadoop functions"
This reverts commit 443f5e1bbcf9ec55e5ce6e4f738a002a47818100.
This commit unfortunately would break source compatibility if users have named
the hadoopConf parameter.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e4b4bd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e4b4bd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e4b4bd0
Branch: refs/heads/master
Commit: 9e4b4bd0837cfc4ef1af1edcbc56290821e49e92
Parents: d2f4f30
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Jun 17 19:34:17 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jun 17 19:34:17 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/rdd/PairRDDFunctions.scala | 49 ++++++++++----------
1 file changed, 24 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9e4b4bd0/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index bff77b4..fe36c80 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -719,9 +719,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
- hadoopConf: Configuration = self.context.hadoopConfiguration)
+ conf: Configuration = self.context.hadoopConfiguration)
{
- val job = new NewAPIHadoopJob(hadoopConf)
+ val job = new NewAPIHadoopJob(conf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
@@ -752,25 +752,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
- hadoopConf: JobConf = new JobConf(self.context.hadoopConfiguration),
+ conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None) {
- hadoopConf.setOutputKeyClass(keyClass)
- hadoopConf.setOutputValueClass(valueClass)
+ conf.setOutputKeyClass(keyClass)
+ conf.setOutputValueClass(valueClass)
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
- hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
+ conf.set("mapred.output.format.class", outputFormatClass.getName)
for (c <- codec) {
- hadoopConf.setCompressMapOutput(true)
- hadoopConf.set("mapred.output.compress", "true")
- hadoopConf.setMapOutputCompressorClass(c)
- hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
- hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
+ conf.setCompressMapOutput(true)
+ conf.set("mapred.output.compress", "true")
+ conf.setMapOutputCompressorClass(c)
+ conf.set("mapred.output.compression.codec", c.getCanonicalName)
+ conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
- hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
- FileOutputFormat.setOutputPath(hadoopConf,
- SparkHadoopWriter.createPathFromString(path, hadoopConf))
- saveAsHadoopDataset(hadoopConf)
+ conf.setOutputCommitter(classOf[FileOutputCommitter])
+ FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
+ saveAsHadoopDataset(conf)
}
/**
@@ -779,8 +778,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
*/
- def saveAsNewAPIHadoopDataset(hadoopConf: Configuration) {
- val job = new NewAPIHadoopJob(hadoopConf)
+ def saveAsNewAPIHadoopDataset(conf: Configuration) {
+ val job = new NewAPIHadoopJob(conf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
@@ -836,10 +835,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
* MapReduce job.
*/
- def saveAsHadoopDataset(hadoopConf: JobConf) {
- val outputFormatInstance = hadoopConf.getOutputFormat
- val keyClass = hadoopConf.getOutputKeyClass
- val valueClass = hadoopConf.getOutputValueClass
+ def saveAsHadoopDataset(conf: JobConf) {
+ val outputFormatInstance = conf.getOutputFormat
+ val keyClass = conf.getOutputKeyClass
+ val valueClass = conf.getOutputValueClass
if (outputFormatInstance == null) {
throw new SparkException("Output format class not set")
}
@@ -849,18 +848,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
if (valueClass == null) {
throw new SparkException("Output value class not set")
}
- SparkHadoopUtil.get.addCredentials(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(conf)
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
- val ignoredFs = FileSystem.get(hadoopConf)
- hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
+ val ignoredFs = FileSystem.get(conf)
+ conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
}
- val writer = new SparkHadoopWriter(hadoopConf)
+ val writer = new SparkHadoopWriter(conf)
writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {