You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/06 02:49:40 UTC

[GitHub] [spark] LantaoJin commented on a change in pull request #24527: [SPARK-27635][SQL] Prevent from splitting too many partitions smaller than row group size in Parquet file format

LantaoJin commented on a change in pull request #24527: [SPARK-27635][SQL] Prevent from splitting too many partitions smaller than row group size in Parquet file format
URL: https://github.com/apache/spark/pull/24527#discussion_r281052871
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 ##########
 @@ -420,8 +420,12 @@ case class FileSourceScanExec(
       selectedPartitions: Seq[PartitionDirectory],
       fsRelation: HadoopFsRelation): RDD[InternalRow] = {
     val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
-    val maxSplitBytes =
-      FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+    val maxSplitBytes = relation.fileFormat match {
+      case _ : ParquetSource =>
+        fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes // parquet.block.size
+      case _ =>
+        FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+    }
 
 Review comment:
   It may be hard to provide a UT. This case only happens in one of our jobs which we enable multi-thread to read from one HDFS folder and write to different target HDFS folders with different filters. With DRA enabled and the job launched many and many executors with near 8000 active tasks. When the job runs for a while, the task number of filter/scan stages increases from 200 to over 5000. And we got many below logs:
   
   > 19/04/29 06:13:48 INFO FileSourceScanExec: Planning scan with bin packing, max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:13:48 INFO FileSourceScanExec: Planning scan with bin packing, max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:13:49 INFO FileSourceScanExec: Planning scan with bin packing, max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:15:49 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4474908 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:16:15 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:16:21 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:16:23 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
   19/04/29 06:16:23 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
   
   This issue would gone in four cases:
   
   1. set "spark.default.parallelism" to a fixed value.
   2. Disable DRA and set num-executors to a low value.
   3. The app can not get too many resources to launch executors
   4. Run jobs one by one instead multi-thread to run. 
   
   All of above will prevent app to require too many partitions since less cores:
   ```scala
     override def defaultParallelism(): Int = { //  if not set, more resources, more cores
       conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
     }
   
     def maxSplitBytes(
         sparkSession: SparkSession,
         selectedPartitions: Seq[PartitionDirectory]): Long = {
       val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
       val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
       val defaultParallelism = sparkSession.sparkContext.defaultParallelism
       val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
       val bytesPerCore = totalBytes / defaultParallelism // more cores, less bytesPerCore
       // less bytesPerCore, less maxSplitBytes
       Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
     }
   
     def splitFiles(
         sparkSession: SparkSession,
         file: FileStatus,
         filePath: Path,
         isSplitable: Boolean,
         maxSplitBytes: Long,
         partitionValues: InternalRow): Seq[PartitionedFile] = {
       if (isSplitable) {
         (0L until file.getLen by maxSplitBytes).map { offset => // less maxSplitBytes, more partitions
           val remaining = file.getLen - offset
           val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
           val hosts = getBlockHosts(getBlockLocations(file), offset, size)
           PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
         }
       } else {
         Seq(getPartitionedFile(file, filePath, partitionValues))
       }
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org