You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/06/05 20:39:40 UTC
git commit: SPARK-1677: allow user to disable output dir existence
checking
Repository: spark
Updated Branches:
refs/heads/master 7c160293d -> 89cdbb087
SPARK-1677: allow user to disable output dir existence checking
https://issues.apache.org/jira/browse/SPARK-1677
For compatibility with older versions of Spark it would be nice to have an option `spark.hadoop.validateOutputSpecs` (default true) for the user to disable the output directory existence checking
Author: CodingCat <zh...@gmail.com>
Closes #947 from CodingCat/SPARK-1677 and squashes the following commits:
7930f83 [CodingCat] miao
c0c0e03 [CodingCat] bug fix and doc update
5318562 [CodingCat] bug fix
13219b5 [CodingCat] allow user to disable output dir existence checking
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89cdbb08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89cdbb08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89cdbb08
Branch: refs/heads/master
Commit: 89cdbb087cb2f0d03be2dd77440300c6bd61c792
Parents: 7c16029
Author: CodingCat <zh...@gmail.com>
Authored: Thu Jun 5 11:39:35 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jun 5 11:39:35 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/rdd/PairRDDFunctions.scala | 6 ++++--
.../test/scala/org/apache/spark/FileSuite.scala | 22 ++++++++++++++++++++
docs/configuration.md | 8 +++++++
3 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/89cdbb08/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index f2ce3cb..8909980 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
- if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
+ if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
+ jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
@@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")
- if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
+ if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
+ outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(conf)
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
http://git-wip-us.apache.org/repos/asf/spark/blob/89cdbb08/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 1f2206b..070e974 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
}
}
+ test ("allow user to disable the output directory existence checking (old Hadoop API") {
+ val sf = new SparkConf()
+ sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
+ sc = new SparkContext(sf)
+ val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
+ randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+ assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
+ randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+ assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
+ }
+
test ("prevent user from overwriting the empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
@@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
}
}
+ test ("allow user to disable the output directory existence checking (new Hadoop API") {
+ val sf = new SparkConf()
+ sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
+ sc = new SparkContext(sf)
+ val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+ assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+ assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
+ }
+
test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
http://git-wip-us.apache.org/repos/asf/spark/blob/89cdbb08/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0697f7f..71fafa5 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful
this duration will be cleared as well.
</td>
</tr>
+<tr>
+ <td>spark.hadoop.validateOutputSpecs</td>
+ <td>true</td>
+ <td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
+ used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
+ output directories. We recommend that users do not disable this except if trying to achieve compatibility with
+ previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
+</tr>
</table>
#### Networking