You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/16 14:15:56 UTC
spark git commit: [SPARK-22233][CORE][FOLLOW-UP] Allow user to filter
out empty split in HadoopRDD
Repository: spark
Updated Branches:
refs/heads/master 0ae96495d -> 0fa10666c
[SPARK-22233][CORE][FOLLOW-UP] Allow user to filter out empty split in HadoopRDD
## What changes were proposed in this pull request?
Update the config `spark.files.ignoreEmptySplits`, rename it and make it internal.
This is followup of #19464
## How was this patch tested?
Exsiting tests.
Author: Xingbo Jiang <xi...@databricks.com>
Closes #19504 from jiangxb1987/partitionsplit.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fa10666
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fa10666
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fa10666
Branch: refs/heads/master
Commit: 0fa10666cf75e3c4929940af49c8a6f6ea874759
Parents: 0ae9649
Author: Xingbo Jiang <xi...@databricks.com>
Authored: Mon Oct 16 22:15:50 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Oct 16 22:15:50 2017 +0800
----------------------------------------------------------------------
.../org/apache/spark/internal/config/package.scala | 11 ++++++-----
.../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 4 ++--
.../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 4 ++--
core/src/test/scala/org/apache/spark/FileSuite.scala | 14 +++++++++-----
4 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0fa10666/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ce013d6..efffdca 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -270,11 +270,12 @@ package object config {
.longConf
.createWithDefault(4 * 1024 * 1024)
- private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits")
- .doc("If true, methods that use HadoopRDD and NewHadoopRDD such as " +
- "SparkContext.textFiles will not create a partition for input splits that are empty.")
- .booleanConf
- .createWithDefault(false)
+ private[spark] val HADOOP_RDD_IGNORE_EMPTY_SPLITS =
+ ConfigBuilder("spark.hadoopRDD.ignoreEmptySplits")
+ .internal()
+ .doc("When true, HadoopRDD/NewHadoopRDD will not create partitions for empty input splits.")
+ .booleanConf
+ .createWithDefault(false)
private[spark] val SECRET_REDACTION_PATTERN =
ConfigBuilder("spark.redaction.regex")
http://git-wip-us.apache.org/repos/asf/spark/blob/0fa10666/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 1f33c0a..2480559 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
+import org.apache.spark.internal.config._
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
@@ -134,7 +134,7 @@ class HadoopRDD[K, V](
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
- private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
+ private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
http://git-wip-us.apache.org/repos/asf/spark/blob/0fa10666/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index db4eac1..e4dd1b6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
+import org.apache.spark.internal.config._
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
@@ -90,7 +90,7 @@ class NewHadoopRDD[K, V](
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
- private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
+ private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
http://git-wip-us.apache.org/repos/asf/spark/blob/0fa10666/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 4da4323..e9539dc 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
-import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
+import org.apache.spark.internal.config._
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -510,9 +510,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}
}
- test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+ test("spark.hadoopRDD.ignoreEmptySplits work correctly (old Hadoop API)") {
val conf = new SparkConf()
- conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
+ .setAppName("test")
+ .setMaster("local")
+ .set(HADOOP_RDD_IGNORE_EMPTY_SPLITS, true)
sc = new SparkContext(conf)
def testIgnoreEmptySplits(
@@ -549,9 +551,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
expectedPartitionNum = 2)
}
- test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
+ test("spark.hadoopRDD.ignoreEmptySplits work correctly (new Hadoop API)") {
val conf = new SparkConf()
- conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
+ .setAppName("test")
+ .setMaster("local")
+ .set(HADOOP_RDD_IGNORE_EMPTY_SPLITS, true)
sc = new SparkContext(conf)
def testIgnoreEmptySplits(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org