You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/06/05 20:39:40 UTC

git commit: SPARK-1677: allow user to disable output dir existence checking

Repository: spark
Updated Branches:
  refs/heads/master 7c160293d -> 89cdbb087


SPARK-1677: allow user to disable output dir existence checking

https://issues.apache.org/jira/browse/SPARK-1677

For compatibility with older versions of Spark it would be nice to have an option `spark.hadoop.validateOutputSpecs` (default true)  for the user to disable the output directory existence checking

Author: CodingCat <zh...@gmail.com>

Closes #947 from CodingCat/SPARK-1677 and squashes the following commits:

7930f83 [CodingCat] miao
c0c0e03 [CodingCat] bug fix and doc update
5318562 [CodingCat] bug fix
13219b5 [CodingCat] allow user to disable output dir existence checking


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89cdbb08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89cdbb08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89cdbb08

Branch: refs/heads/master
Commit: 89cdbb087cb2f0d03be2dd77440300c6bd61c792
Parents: 7c16029
Author: CodingCat <zh...@gmail.com>
Authored: Thu Jun 5 11:39:35 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jun 5 11:39:35 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  6 ++++--
 .../test/scala/org/apache/spark/FileSuite.scala | 22 ++++++++++++++++++++
 docs/configuration.md                           |  8 +++++++
 3 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/89cdbb08/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index f2ce3cb..8909980 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val outfmt = job.getOutputFormatClass
     val jobFormat = outfmt.newInstance
 
-    if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
+    if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
+      jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
       // FileOutputFormat ignores the filesystem parameter
       jobFormat.checkOutputSpecs(job)
     }
@@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
       valueClass.getSimpleName + ")")
 
-    if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
+    if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
+      outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
       // FileOutputFormat ignores the filesystem parameter
       val ignoredFs = FileSystem.get(conf)
       conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/89cdbb08/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 1f2206b..070e974 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test ("allow user to disable the output directory existence checking (old Hadoop API") {
+    val sf = new SparkConf()
+    sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
+    sc = new SparkContext(sf)
+    val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
+    randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
+    randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
+  }
+
   test ("prevent user from overwriting the empty directory (new Hadoop API)") {
     sc = new SparkContext("local", "test")
     val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
@@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test ("allow user to disable the output directory existence checking (new Hadoop API") {
+    val sf = new SparkConf()
+    sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
+    sc = new SparkContext(sf)
+    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
+    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
+  }
+
   test ("save Hadoop Dataset through old Hadoop API") {
     sc = new SparkContext("local", "test")
     val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/89cdbb08/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0697f7f..71fafa5 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful
     this duration will be cleared as well.
   </td>
 </tr>
+<tr>
+    <td>spark.hadoop.validateOutputSpecs</td>
+    <td>true</td>
+    <td>If set to true, validates the output specification (e.g. checking if the output directory already exists) 
+    used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing 
+    output directories. We recommend that users do not disable this except if trying to achieve compatibility with 
+    previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
+</tr>
 </table>
 
 #### Networking