You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/04/17 13:52:42 UTC

spark git commit: [SPARK-22676] Avoid iterating all partition paths when spark.sql.hive.verifyPartitionPath=true

Repository: spark
Updated Branches:
  refs/heads/master 0a9172a05 -> ed4101d29


[SPARK-22676] Avoid iterating all partition paths when spark.sql.hive.verifyPartitionPath=true

## What changes were proposed in this pull request?

In current code, it will scanning all partition paths when spark.sql.hive.verifyPartitionPath=true.
e.g. table like below:
```
CREATE TABLE `test`(
`id` int,
`age` int,
`name` string)
PARTITIONED BY (
`A` string,
`B` string)
load data local inpath '/tmp/data0' into table test partition(A='00', B='00')
load data local inpath '/tmp/data1' into table test partition(A='01', B='01')
load data local inpath '/tmp/data2' into table test partition(A='10', B='10')
load data local inpath '/tmp/data3' into table test partition(A='11', B='11')
```
If I query with SQL – "select * from test where A='00' and B='01'  ", current code will scan all partition paths including '/data/A=00/B=00', '/data/A=00/B=00', '/data/A=01/B=01', '/data/A=10/B=10', '/data/A=11/B=11'. It costs much time and memory cost.

This pr proposes to avoid iterating all partition paths. Add a config `spark.files.ignoreMissingFiles` and ignore the `file not found` when `getPartitions/compute`(for hive table scan). This is much like the logic brought by
`spark.sql.files.ignoreMissingFiles`(which is for datasource scan).

## How was this patch tested?
UT

Author: jinxing <ji...@126.com>

Closes #19868 from jinxing64/SPARK-22676.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed4101d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed4101d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed4101d2

Branch: refs/heads/master
Commit: ed4101d29f50d54fd7846421e4c00e9ecd3599d0
Parents: 0a9172a
Author: jinxing <ji...@126.com>
Authored: Tue Apr 17 21:52:33 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Apr 17 21:52:33 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |  6 ++
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 43 ++++++++----
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 45 +++++++++----
 .../test/scala/org/apache/spark/FileSuite.scala | 69 +++++++++++++++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala |  3 +-
 .../spark/sql/hive/QueryPartitionSuite.scala    | 40 ++++++++++++
 6 files changed, 181 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 407545a..99d779f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -301,6 +301,12 @@ package object config {
     .booleanConf
     .createWithDefault(false)
 
+  private[spark] val IGNORE_MISSING_FILES = ConfigBuilder("spark.files.ignoreMissingFiles")
+    .doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " +
+        "encountering missing files and the contents that have been read will still be returned.")
+    .booleanConf
+    .createWithDefault(false)
+
   private[spark] val APP_CALLER_CONTEXT = ConfigBuilder("spark.log.callerContext")
     .stringConf
     .createOptional

http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2480559..44895ab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
 import java.text.SimpleDateFormat
 import java.util.{Date, Locale}
 
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapred.lib.CombineFileSplit
 import org.apache.hadoop.mapreduce.TaskType
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark._
@@ -134,6 +135,8 @@ class HadoopRDD[K, V](
 
   private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
 
+  private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
+
   private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
 
   // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
@@ -197,17 +200,24 @@ class HadoopRDD[K, V](
     val jobConf = getJobConf()
     // add the credentials here as this can be called before SparkContext initialized
     SparkHadoopUtil.get.addCredentials(jobConf)
-    val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
-    val inputSplits = if (ignoreEmptySplits) {
-      allInputSplits.filter(_.getLength > 0)
-    } else {
-      allInputSplits
-    }
-    val array = new Array[Partition](inputSplits.size)
-    for (i <- 0 until inputSplits.size) {
-      array(i) = new HadoopPartition(id, i, inputSplits(i))
+    try {
+      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
+      val inputSplits = if (ignoreEmptySplits) {
+        allInputSplits.filter(_.getLength > 0)
+      } else {
+        allInputSplits
+      }
+      val array = new Array[Partition](inputSplits.size)
+      for (i <- 0 until inputSplits.size) {
+        array(i) = new HadoopPartition(id, i, inputSplits(i))
+      }
+      array
+    } catch {
+      case e: InvalidInputException if ignoreMissingFiles =>
+        logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
+            s" partitions returned from this path.", e)
+        Array.empty[Partition]
     }
-    array
   }
 
   override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
@@ -256,6 +266,12 @@ class HadoopRDD[K, V](
         try {
           inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
         } catch {
+          case e: FileNotFoundException if ignoreMissingFiles =>
+            logWarning(s"Skipped missing file: ${split.inputSplit}", e)
+            finished = true
+            null
+          // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+          case e: FileNotFoundException if !ignoreMissingFiles => throw e
           case e: IOException if ignoreCorruptFiles =>
             logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
             finished = true
@@ -276,6 +292,11 @@ class HadoopRDD[K, V](
         try {
           finished = !reader.next(key, value)
         } catch {
+          case e: FileNotFoundException if ignoreMissingFiles =>
+            logWarning(s"Skipped missing file: ${split.inputSplit}", e)
+            finished = true
+          // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+          case e: FileNotFoundException if !ignoreMissingFiles => throw e
           case e: IOException if ignoreCorruptFiles =>
             logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
             finished = true

http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index e4dd1b6..ff66a04 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
 import java.text.SimpleDateFormat
 import java.util.{Date, Locale}
 
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileInputFormat, FileSplit, InvalidInputException}
 import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
 
 import org.apache.spark._
@@ -90,6 +90,8 @@ class NewHadoopRDD[K, V](
 
   private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
 
+  private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
+
   private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
 
   def getConf: Configuration = {
@@ -124,17 +126,25 @@ class NewHadoopRDD[K, V](
         configurable.setConf(_conf)
       case _ =>
     }
-    val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
-    val rawSplits = if (ignoreEmptySplits) {
-      allRowSplits.filter(_.getLength > 0)
-    } else {
-      allRowSplits
-    }
-    val result = new Array[Partition](rawSplits.size)
-    for (i <- 0 until rawSplits.size) {
-      result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+    try {
+      val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
+      val rawSplits = if (ignoreEmptySplits) {
+        allRowSplits.filter(_.getLength > 0)
+      } else {
+        allRowSplits
+      }
+      val result = new Array[Partition](rawSplits.size)
+      for (i <- 0 until rawSplits.size) {
+        result(i) =
+            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+      }
+      result
+    } catch {
+      case e: InvalidInputException if ignoreMissingFiles =>
+        logWarning(s"${_conf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
+            s" partitions returned from this path.", e)
+        Array.empty[Partition]
     }
-    result
   }
 
   override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
@@ -189,6 +199,12 @@ class NewHadoopRDD[K, V](
           _reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
           _reader
         } catch {
+          case e: FileNotFoundException if ignoreMissingFiles =>
+            logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
+            finished = true
+            null
+          // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+          case e: FileNotFoundException if !ignoreMissingFiles => throw e
           case e: IOException if ignoreCorruptFiles =>
             logWarning(
               s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
@@ -213,6 +229,11 @@ class NewHadoopRDD[K, V](
           try {
             finished = !reader.nextKeyValue
           } catch {
+            case e: FileNotFoundException if ignoreMissingFiles =>
+              logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
+              finished = true
+            // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+            case e: FileNotFoundException if !ignoreMissingFiles => throw e
             case e: IOException if ignoreCorruptFiles =>
               logWarning(
                 s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",

http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 55a9122..a441b9c 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -23,6 +23,7 @@ import java.util.zip.GZIPOutputStream
 
 import scala.io.Source
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io._
 import org.apache.hadoop.io.compress.DefaultCodec
@@ -32,7 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
 
 import org.apache.spark.internal.config._
-import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
+import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD, RDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
@@ -596,4 +597,70 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
       actualPartitionNum = 5,
       expectedPartitionNum = 2)
   }
+
+  test("spark.files.ignoreMissingFiles should work both HadoopRDD and NewHadoopRDD") {
+    // "file not found" can happen both when getPartitions or compute in HadoopRDD/NewHadoopRDD,
+    // We test both cases here.
+
+    val deletedPath = new Path(tempDir.getAbsolutePath, "test-data-1")
+    val fs = deletedPath.getFileSystem(new Configuration())
+    fs.delete(deletedPath, true)
+    intercept[FileNotFoundException](fs.open(deletedPath))
+
+    def collectRDDAndDeleteFileBeforeCompute(newApi: Boolean): Array[_] = {
+      val dataPath = new Path(tempDir.getAbsolutePath, "test-data-2")
+      val writer = new OutputStreamWriter(new FileOutputStream(new File(dataPath.toString)))
+      writer.write("hello\n")
+      writer.write("world\n")
+      writer.close()
+      val rdd = if (newApi) {
+        sc.newAPIHadoopFile(dataPath.toString, classOf[NewTextInputFormat],
+          classOf[LongWritable], classOf[Text])
+      } else {
+        sc.textFile(dataPath.toString)
+      }
+      rdd.partitions
+      fs.delete(dataPath, true)
+      // Exception happens when initialize record reader in HadoopRDD/NewHadoopRDD.compute
+      // because partitions' info already cached.
+      rdd.collect()
+    }
+
+    // collect HadoopRDD and NewHadoopRDD when spark.files.ignoreMissingFiles=false by default.
+    sc = new SparkContext("local", "test")
+    intercept[org.apache.hadoop.mapred.InvalidInputException] {
+      // Exception happens when HadoopRDD.getPartitions
+      sc.textFile(deletedPath.toString).collect()
+    }
+
+    var e = intercept[SparkException] {
+      collectRDDAndDeleteFileBeforeCompute(false)
+    }
+    assert(e.getCause.isInstanceOf[java.io.FileNotFoundException])
+
+    intercept[org.apache.hadoop.mapreduce.lib.input.InvalidInputException] {
+      // Exception happens when NewHadoopRDD.getPartitions
+      sc.newAPIHadoopFile(deletedPath.toString, classOf[NewTextInputFormat],
+        classOf[LongWritable], classOf[Text]).collect
+    }
+
+    e = intercept[SparkException] {
+      collectRDDAndDeleteFileBeforeCompute(true)
+    }
+    assert(e.getCause.isInstanceOf[java.io.FileNotFoundException])
+
+    sc.stop()
+
+    // collect HadoopRDD and NewHadoopRDD when spark.files.ignoreMissingFiles=true.
+    val conf = new SparkConf().set(IGNORE_MISSING_FILES, true)
+    sc = new SparkContext("local", "test", conf)
+    assert(sc.textFile(deletedPath.toString).collect().isEmpty)
+
+    assert(collectRDDAndDeleteFileBeforeCompute(false).isEmpty)
+
+    assert(sc.newAPIHadoopFile(deletedPath.toString, classOf[NewTextInputFormat],
+      classOf[LongWritable], classOf[Text]).collect().isEmpty)
+
+    assert(collectRDDAndDeleteFileBeforeCompute(true).isEmpty)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0dc47bf..3729bd5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -437,7 +437,8 @@ object SQLConf {
 
   val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
     .doc("When true, check all the partition paths under the table\'s root directory " +
-         "when reading data stored in HDFS.")
+         "when reading data stored in HDFS. This configuration will be deprecated in the future " +
+         "releases and replaced by spark.files.ignoreMissingFiles.")
     .booleanConf
     .createWithDefault(false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index b2dc401..78156b1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -23,6 +23,7 @@ import java.sql.Timestamp
 import com.google.common.io.Files
 import org.apache.hadoop.fs.FileSystem
 
+import org.apache.spark.internal.config._
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -70,6 +71,45 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
     }
   }
 
+  test("Replace spark.sql.hive.verifyPartitionPath by spark.files.ignoreMissingFiles") {
+    withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "false")) {
+      sparkContext.conf.set(IGNORE_MISSING_FILES.key, "true")
+      val testData = sparkContext.parallelize(
+        (1 to 10).map(i => TestData(i, i.toString))).toDF()
+      testData.createOrReplaceTempView("testData")
+
+      val tmpDir = Files.createTempDir()
+      // create the table for test
+      sql(s"CREATE TABLE table_with_partition(key int,value string) " +
+          s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
+      sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='1') " +
+          "SELECT key,value FROM testData")
+      sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='2') " +
+          "SELECT key,value FROM testData")
+      sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='3') " +
+          "SELECT key,value FROM testData")
+      sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='4') " +
+          "SELECT key,value FROM testData")
+
+      // test for the exist path
+      checkAnswer(sql("select key,value from table_with_partition"),
+        testData.toDF.collect ++ testData.toDF.collect
+            ++ testData.toDF.collect ++ testData.toDF.collect)
+
+      // delete the path of one partition
+      tmpDir.listFiles
+          .find { f => f.isDirectory && f.getName().startsWith("ds=") }
+          .foreach { f => Utils.deleteRecursively(f) }
+
+      // test for after delete the path
+      checkAnswer(sql("select key,value from table_with_partition"),
+        testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect)
+
+      sql("DROP TABLE IF EXISTS table_with_partition")
+      sql("DROP TABLE IF EXISTS createAndInsertTest")
+    }
+  }
+
   test("SPARK-21739: Cast expression should initialize timezoneId") {
     withTable("table_with_timestamp_partition") {
       sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED BY (ts TIMESTAMP)")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org