You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/04/17 13:52:42 UTC
spark git commit: [SPARK-22676] Avoid iterating all partition paths
when spark.sql.hive.verifyPartitionPath=true
Repository: spark
Updated Branches:
refs/heads/master 0a9172a05 -> ed4101d29
[SPARK-22676] Avoid iterating all partition paths when spark.sql.hive.verifyPartitionPath=true
## What changes were proposed in this pull request?
In current code, it will scanning all partition paths when spark.sql.hive.verifyPartitionPath=true.
e.g. table like below:
```
CREATE TABLE `test`(
`id` int,
`age` int,
`name` string)
PARTITIONED BY (
`A` string,
`B` string)
load data local inpath '/tmp/data0' into table test partition(A='00', B='00')
load data local inpath '/tmp/data1' into table test partition(A='01', B='01')
load data local inpath '/tmp/data2' into table test partition(A='10', B='10')
load data local inpath '/tmp/data3' into table test partition(A='11', B='11')
```
If I query with SQL – "select * from test where A='00' and B='01' ", current code will scan all partition paths including '/data/A=00/B=00', '/data/A=00/B=00', '/data/A=01/B=01', '/data/A=10/B=10', '/data/A=11/B=11'. It costs much time and memory cost.
This pr proposes to avoid iterating all partition paths. Add a config `spark.files.ignoreMissingFiles` and ignore the `file not found` when `getPartitions/compute`(for hive table scan). This is much like the logic brought by
`spark.sql.files.ignoreMissingFiles`(which is for datasource scan).
## How was this patch tested?
UT
Author: jinxing <ji...@126.com>
Closes #19868 from jinxing64/SPARK-22676.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed4101d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed4101d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed4101d2
Branch: refs/heads/master
Commit: ed4101d29f50d54fd7846421e4c00e9ecd3599d0
Parents: 0a9172a
Author: jinxing <ji...@126.com>
Authored: Tue Apr 17 21:52:33 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Apr 17 21:52:33 2018 +0800
----------------------------------------------------------------------
.../apache/spark/internal/config/package.scala | 6 ++
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 43 ++++++++----
.../org/apache/spark/rdd/NewHadoopRDD.scala | 45 +++++++++----
.../test/scala/org/apache/spark/FileSuite.scala | 69 +++++++++++++++++++-
.../org/apache/spark/sql/internal/SQLConf.scala | 3 +-
.../spark/sql/hive/QueryPartitionSuite.scala | 40 ++++++++++++
6 files changed, 181 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 407545a..99d779f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -301,6 +301,12 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val IGNORE_MISSING_FILES = ConfigBuilder("spark.files.ignoreMissingFiles")
+ .doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " +
+ "encountering missing files and the contents that have been read will still be returned.")
+ .booleanConf
+ .createWithDefault(false)
+
private[spark] val APP_CALLER_CONTEXT = ConfigBuilder("spark.log.callerContext")
.stringConf
.createOptional
http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2480559..44895ab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
import java.text.SimpleDateFormat
import java.util.{Date, Locale}
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.mapreduce.TaskType
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark._
@@ -134,6 +135,8 @@ class HadoopRDD[K, V](
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
+ private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
+
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
@@ -197,17 +200,24 @@ class HadoopRDD[K, V](
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
- val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
- val inputSplits = if (ignoreEmptySplits) {
- allInputSplits.filter(_.getLength > 0)
- } else {
- allInputSplits
- }
- val array = new Array[Partition](inputSplits.size)
- for (i <- 0 until inputSplits.size) {
- array(i) = new HadoopPartition(id, i, inputSplits(i))
+ try {
+ val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
+ val inputSplits = if (ignoreEmptySplits) {
+ allInputSplits.filter(_.getLength > 0)
+ } else {
+ allInputSplits
+ }
+ val array = new Array[Partition](inputSplits.size)
+ for (i <- 0 until inputSplits.size) {
+ array(i) = new HadoopPartition(id, i, inputSplits(i))
+ }
+ array
+ } catch {
+ case e: InvalidInputException if ignoreMissingFiles =>
+ logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
+ s" partitions returned from this path.", e)
+ Array.empty[Partition]
}
- array
}
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
@@ -256,6 +266,12 @@ class HadoopRDD[K, V](
try {
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
} catch {
+ case e: FileNotFoundException if ignoreMissingFiles =>
+ logWarning(s"Skipped missing file: ${split.inputSplit}", e)
+ finished = true
+ null
+ // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+ case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
@@ -276,6 +292,11 @@ class HadoopRDD[K, V](
try {
finished = !reader.next(key, value)
} catch {
+ case e: FileNotFoundException if ignoreMissingFiles =>
+ logWarning(s"Skipped missing file: ${split.inputSplit}", e)
+ finished = true
+ // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+ case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index e4dd1b6..ff66a04 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
import java.text.SimpleDateFormat
import java.util.{Date, Locale}
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileInputFormat, FileSplit, InvalidInputException}
import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
import org.apache.spark._
@@ -90,6 +90,8 @@ class NewHadoopRDD[K, V](
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
+ private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
+
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
def getConf: Configuration = {
@@ -124,17 +126,25 @@ class NewHadoopRDD[K, V](
configurable.setConf(_conf)
case _ =>
}
- val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
- val rawSplits = if (ignoreEmptySplits) {
- allRowSplits.filter(_.getLength > 0)
- } else {
- allRowSplits
- }
- val result = new Array[Partition](rawSplits.size)
- for (i <- 0 until rawSplits.size) {
- result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ try {
+ val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
+ val rawSplits = if (ignoreEmptySplits) {
+ allRowSplits.filter(_.getLength > 0)
+ } else {
+ allRowSplits
+ }
+ val result = new Array[Partition](rawSplits.size)
+ for (i <- 0 until rawSplits.size) {
+ result(i) =
+ new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ }
+ result
+ } catch {
+ case e: InvalidInputException if ignoreMissingFiles =>
+ logWarning(s"${_conf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
+ s" partitions returned from this path.", e)
+ Array.empty[Partition]
}
- result
}
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
@@ -189,6 +199,12 @@ class NewHadoopRDD[K, V](
_reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
_reader
} catch {
+ case e: FileNotFoundException if ignoreMissingFiles =>
+ logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
+ finished = true
+ null
+ // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+ case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
@@ -213,6 +229,11 @@ class NewHadoopRDD[K, V](
try {
finished = !reader.nextKeyValue
} catch {
+ case e: FileNotFoundException if ignoreMissingFiles =>
+ logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
+ finished = true
+ // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+ case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 55a9122..a441b9c 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -23,6 +23,7 @@ import java.util.zip.GZIPOutputStream
import scala.io.Source
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
@@ -32,7 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.apache.spark.internal.config._
-import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
+import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -596,4 +597,70 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
actualPartitionNum = 5,
expectedPartitionNum = 2)
}
+
+ test("spark.files.ignoreMissingFiles should work both HadoopRDD and NewHadoopRDD") {
+ // "file not found" can happen both when getPartitions or compute in HadoopRDD/NewHadoopRDD,
+ // We test both cases here.
+
+ val deletedPath = new Path(tempDir.getAbsolutePath, "test-data-1")
+ val fs = deletedPath.getFileSystem(new Configuration())
+ fs.delete(deletedPath, true)
+ intercept[FileNotFoundException](fs.open(deletedPath))
+
+ def collectRDDAndDeleteFileBeforeCompute(newApi: Boolean): Array[_] = {
+ val dataPath = new Path(tempDir.getAbsolutePath, "test-data-2")
+ val writer = new OutputStreamWriter(new FileOutputStream(new File(dataPath.toString)))
+ writer.write("hello\n")
+ writer.write("world\n")
+ writer.close()
+ val rdd = if (newApi) {
+ sc.newAPIHadoopFile(dataPath.toString, classOf[NewTextInputFormat],
+ classOf[LongWritable], classOf[Text])
+ } else {
+ sc.textFile(dataPath.toString)
+ }
+ rdd.partitions
+ fs.delete(dataPath, true)
+ // Exception happens when initialize record reader in HadoopRDD/NewHadoopRDD.compute
+ // because partitions' info already cached.
+ rdd.collect()
+ }
+
+ // collect HadoopRDD and NewHadoopRDD when spark.files.ignoreMissingFiles=false by default.
+ sc = new SparkContext("local", "test")
+ intercept[org.apache.hadoop.mapred.InvalidInputException] {
+ // Exception happens when HadoopRDD.getPartitions
+ sc.textFile(deletedPath.toString).collect()
+ }
+
+ var e = intercept[SparkException] {
+ collectRDDAndDeleteFileBeforeCompute(false)
+ }
+ assert(e.getCause.isInstanceOf[java.io.FileNotFoundException])
+
+ intercept[org.apache.hadoop.mapreduce.lib.input.InvalidInputException] {
+ // Exception happens when NewHadoopRDD.getPartitions
+ sc.newAPIHadoopFile(deletedPath.toString, classOf[NewTextInputFormat],
+ classOf[LongWritable], classOf[Text]).collect
+ }
+
+ e = intercept[SparkException] {
+ collectRDDAndDeleteFileBeforeCompute(true)
+ }
+ assert(e.getCause.isInstanceOf[java.io.FileNotFoundException])
+
+ sc.stop()
+
+ // collect HadoopRDD and NewHadoopRDD when spark.files.ignoreMissingFiles=true.
+ val conf = new SparkConf().set(IGNORE_MISSING_FILES, true)
+ sc = new SparkContext("local", "test", conf)
+ assert(sc.textFile(deletedPath.toString).collect().isEmpty)
+
+ assert(collectRDDAndDeleteFileBeforeCompute(false).isEmpty)
+
+ assert(sc.newAPIHadoopFile(deletedPath.toString, classOf[NewTextInputFormat],
+ classOf[LongWritable], classOf[Text]).collect().isEmpty)
+
+ assert(collectRDDAndDeleteFileBeforeCompute(true).isEmpty)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0dc47bf..3729bd5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -437,7 +437,8 @@ object SQLConf {
val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
.doc("When true, check all the partition paths under the table\'s root directory " +
- "when reading data stored in HDFS.")
+ "when reading data stored in HDFS. This configuration will be deprecated in the future " +
+ "releases and replaced by spark.files.ignoreMissingFiles.")
.booleanConf
.createWithDefault(false)
http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index b2dc401..78156b1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -23,6 +23,7 @@ import java.sql.Timestamp
import com.google.common.io.Files
import org.apache.hadoop.fs.FileSystem
+import org.apache.spark.internal.config._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -70,6 +71,45 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
}
}
+ test("Replace spark.sql.hive.verifyPartitionPath by spark.files.ignoreMissingFiles") {
+ withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "false")) {
+ sparkContext.conf.set(IGNORE_MISSING_FILES.key, "true")
+ val testData = sparkContext.parallelize(
+ (1 to 10).map(i => TestData(i, i.toString))).toDF()
+ testData.createOrReplaceTempView("testData")
+
+ val tmpDir = Files.createTempDir()
+ // create the table for test
+ sql(s"CREATE TABLE table_with_partition(key int,value string) " +
+ s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
+ "SELECT key,value FROM testData")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " +
+ "SELECT key,value FROM testData")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " +
+ "SELECT key,value FROM testData")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " +
+ "SELECT key,value FROM testData")
+
+ // test for the exist path
+ checkAnswer(sql("select key,value from table_with_partition"),
+ testData.toDF.collect ++ testData.toDF.collect
+ ++ testData.toDF.collect ++ testData.toDF.collect)
+
+ // delete the path of one partition
+ tmpDir.listFiles
+ .find { f => f.isDirectory && f.getName().startsWith("ds=") }
+ .foreach { f => Utils.deleteRecursively(f) }
+
+ // test for after delete the path
+ checkAnswer(sql("select key,value from table_with_partition"),
+ testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect)
+
+ sql("DROP TABLE IF EXISTS table_with_partition")
+ sql("DROP TABLE IF EXISTS createAndInsertTest")
+ }
+ }
+
test("SPARK-21739: Cast expression should initialize timezoneId") {
withTable("table_with_timestamp_partition") {
sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED BY (ts TIMESTAMP)")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org