You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/03/01 16:16:43 UTC

spark git commit: [SPARK-19761][SQL] create InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed

Repository: spark
Updated Branches:
  refs/heads/master 5502a9cf8 -> 8aa560b75


[SPARK-19761][SQL] create InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed

## What changes were proposed in this pull request?

If we create a InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero, it will throw an  exception:

```
Positive number of slices required
java.lang.IllegalArgumentException: Positive number of slices required
        at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119)
        at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
        at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:357)
        at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:256)
        at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74)
        at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:50)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9$$anonfun$apply$mcV$sp$2.apply$mcV$sp(FileIndexSuite.scala:186)
        at org.apache.spark.sql.test.SQLTestUtils$class.withSQLConf(SQLTestUtils.scala:105)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite.withSQLConf(FileIndexSuite.scala:33)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply$mcV$sp(FileIndexSuite.scala:185)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
        at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
        at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
        at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```

## How was this patch tested?
unit test added

Author: windpiger <so...@outlook.com>

Closes #17093 from windpiger/fixEmptiPathInBulkListFiles.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8aa560b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8aa560b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8aa560b7

Branch: refs/heads/master
Commit: 8aa560b75e6b083b2a890c52301414285ba35c3d
Parents: 5502a9c
Author: windpiger <so...@outlook.com>
Authored: Wed Mar 1 08:16:29 2017 -0800
Committer: Xiao Li <ga...@gmail.com>
Committed: Wed Mar 1 08:16:29 2017 -0800

----------------------------------------------------------------------
 .../datasources/PartitioningAwareFileIndex.scala    |  2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala     |  6 ++++--
 .../sql/execution/datasources/FileIndexSuite.scala  | 16 ++++++++++++++++
 3 files changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8aa560b7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 75f87a5..549257c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -300,7 +300,7 @@ object PartitioningAwareFileIndex extends Logging {
       sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
 
     // Short-circuits parallel listing when serial listing is likely to be faster.
-    if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+    if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
       return paths.map { path =>
         (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/8aa560b7/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index dc0f130..461dfe3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -402,11 +402,13 @@ object SQLConf {
 
   val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
     buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold")
-      .doc("The maximum number of files allowed for listing files at driver side. If the number " +
-        "of detected files exceeds this value during partition discovery, it tries to list the " +
+      .doc("The maximum number of paths allowed for listing files at driver side. If the number " +
+        "of detected paths exceeds this value during partition discovery, it tries to list the " +
         "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
         "LibSVM data sources.")
       .intConf
+      .checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " +
+        "files at driver side must not be negative")
       .createWithDefault(32)
 
   val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =

http://git-wip-us.apache.org/repos/asf/spark/blob/8aa560b7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index efbfc24..7ea4064 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 
 class FileIndexSuite extends SharedSQLContext {
@@ -179,6 +180,21 @@ class FileIndexSuite extends SharedSQLContext {
     }
   }
 
+  test("InMemoryFileIndex with empty rootPaths when PARALLEL_PARTITION_DISCOVERY_THRESHOLD" +
+    "is a nonpositive number") {
+    withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
+      new InMemoryFileIndex(spark, Seq.empty, Map.empty, None)
+    }
+
+    val e = intercept[IllegalArgumentException] {
+      withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "-1") {
+        new InMemoryFileIndex(spark, Seq.empty, Map.empty, None)
+      }
+    }.getMessage
+    assert(e.contains("The maximum number of paths allowed for listing files at " +
+      "driver side must not be negative"))
+  }
+
   test("refresh for InMemoryFileIndex with FileStatusCache") {
     withTempDir { dir =>
       val fileStatusCache = FileStatusCache.getOrCreate(spark)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org