You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/16 14:15:56 UTC

spark git commit: [SPARK-22233][CORE][FOLLOW-UP] Allow user to filter out empty split in HadoopRDD

Repository: spark
Updated Branches:
  refs/heads/master 0ae96495d -> 0fa10666c


[SPARK-22233][CORE][FOLLOW-UP] Allow user to filter out empty split in HadoopRDD

## What changes were proposed in this pull request?

Update the config `spark.files.ignoreEmptySplits`, rename it and make it internal.

This is followup of #19464

## How was this patch tested?

Exsiting tests.

Author: Xingbo Jiang <xi...@databricks.com>

Closes #19504 from jiangxb1987/partitionsplit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fa10666
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fa10666
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fa10666

Branch: refs/heads/master
Commit: 0fa10666cf75e3c4929940af49c8a6f6ea874759
Parents: 0ae9649
Author: Xingbo Jiang <xi...@databricks.com>
Authored: Mon Oct 16 22:15:50 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Oct 16 22:15:50 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/internal/config/package.scala    | 11 ++++++-----
 .../main/scala/org/apache/spark/rdd/HadoopRDD.scala   |  4 ++--
 .../scala/org/apache/spark/rdd/NewHadoopRDD.scala     |  4 ++--
 core/src/test/scala/org/apache/spark/FileSuite.scala  | 14 +++++++++-----
 4 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0fa10666/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ce013d6..efffdca 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -270,11 +270,12 @@ package object config {
     .longConf
     .createWithDefault(4 * 1024 * 1024)
 
-  private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits")
-    .doc("If true, methods that use HadoopRDD and NewHadoopRDD such as " +
-      "SparkContext.textFiles will not create a partition for input splits that are empty.")
-    .booleanConf
-    .createWithDefault(false)
+  private[spark] val HADOOP_RDD_IGNORE_EMPTY_SPLITS =
+    ConfigBuilder("spark.hadoopRDD.ignoreEmptySplits")
+      .internal()
+      .doc("When true, HadoopRDD/NewHadoopRDD will not create partitions for empty input splits.")
+      .booleanConf
+      .createWithDefault(false)
 
   private[spark] val SECRET_REDACTION_PATTERN =
     ConfigBuilder("spark.redaction.regex")

http://git-wip-us.apache.org/repos/asf/spark/blob/0fa10666/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 1f33c0a..2480559 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
+import org.apache.spark.internal.config._
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
 import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
 import org.apache.spark.storage.StorageLevel
@@ -134,7 +134,7 @@ class HadoopRDD[K, V](
 
   private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
 
-  private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
+  private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
 
   // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
   protected def getJobConf(): JobConf = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0fa10666/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index db4eac1..e4dd1b6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
+import org.apache.spark.internal.config._
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
@@ -90,7 +90,7 @@ class NewHadoopRDD[K, V](
 
   private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
 
-  private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
+  private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
 
   def getConf: Configuration = {
     val conf: Configuration = confBroadcast.value.value

http://git-wip-us.apache.org/repos/asf/spark/blob/0fa10666/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 4da4323..e9539dc 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
 
-import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
+import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
@@ -510,9 +510,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
     }
   }
 
-  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+  test("spark.hadoopRDD.ignoreEmptySplits work correctly (old Hadoop API)") {
     val conf = new SparkConf()
-    conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
+      .setAppName("test")
+      .setMaster("local")
+      .set(HADOOP_RDD_IGNORE_EMPTY_SPLITS, true)
     sc = new SparkContext(conf)
 
     def testIgnoreEmptySplits(
@@ -549,9 +551,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
       expectedPartitionNum = 2)
   }
 
-  test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
+  test("spark.hadoopRDD.ignoreEmptySplits work correctly (new Hadoop API)") {
     val conf = new SparkConf()
-    conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
+      .setAppName("test")
+      .setMaster("local")
+      .set(HADOOP_RDD_IGNORE_EMPTY_SPLITS, true)
     sc = new SparkContext(conf)
 
     def testIgnoreEmptySplits(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org