You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by habren <gi...@git.apache.org> on 2018/08/16 01:32:22 UTC

[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...

Github user habren commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21868#discussion_r210456342
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ---
    @@ -401,12 +399,41 @@ case class FileSourceScanExec(
           fsRelation: HadoopFsRelation): RDD[InternalRow] = {
         val defaultMaxSplitBytes =
           fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
    -    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
    +    var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
         val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
         val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
         val bytesPerCore = totalBytes / defaultParallelism
     
    -    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    +    var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    +    if(fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
    +      fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) {
    +      if (relation.dataSchema.map(_.dataType).forall(dataType =>
    +        dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType]
    +          || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType]
    +          || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) {
    +
    +        def getTypeLength (dataType : DataType) : Int = {
    +          if (dataType.isInstanceOf[StructType]) {
    +            fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
    +          } else if (dataType.isInstanceOf[ArrayType]) {
    +            fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
    +          } else if (dataType.isInstanceOf[MapType]) {
    +            fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
    +          } else {
    +            dataType.defaultSize
    +          }
    +        }
    +
    +        val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_))
    +          .reduceOption(_ + _).getOrElse(StringType.defaultSize)
    +        val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_))
    +          .reduceOption(_ + _).getOrElse(StringType.defaultSize)
    +        val multiplier = totalColumnSize / selectedColumnSize
    --- End diff --
    
    @viirya  Now it also support ORC. Please help to review


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org