You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by habren <gi...@git.apache.org> on 2018/08/16 01:32:22 UTC
[GitHub] spark pull request #21868: [SPARK-24906][SQL] Adaptively enlarge split / par...
Github user habren commented on a diff in the pull request:
https://github.com/apache/spark/pull/21868#discussion_r210456342
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ---
@@ -401,12 +399,41 @@ case class FileSourceScanExec(
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
- val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+ var openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
- val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ if(fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
+ fsRelation.sparkSession.sessionState.conf.isParquetSizeAdaptiveEnabled) {
+ if (relation.dataSchema.map(_.dataType).forall(dataType =>
+ dataType.isInstanceOf[CalendarIntervalType] || dataType.isInstanceOf[StructType]
+ || dataType.isInstanceOf[MapType] || dataType.isInstanceOf[NullType]
+ || dataType.isInstanceOf[AtomicType] || dataType.isInstanceOf[ArrayType])) {
+
+ def getTypeLength (dataType : DataType) : Int = {
+ if (dataType.isInstanceOf[StructType]) {
+ fsRelation.sparkSession.sessionState.conf.parquetStructTypeLength
+ } else if (dataType.isInstanceOf[ArrayType]) {
+ fsRelation.sparkSession.sessionState.conf.parquetArrayTypeLength
+ } else if (dataType.isInstanceOf[MapType]) {
+ fsRelation.sparkSession.sessionState.conf.parquetMapTypeLength
+ } else {
+ dataType.defaultSize
+ }
+ }
+
+ val selectedColumnSize = requiredSchema.map(_.dataType).map(getTypeLength(_))
+ .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+ val totalColumnSize = relation.dataSchema.map(_.dataType).map(getTypeLength(_))
+ .reduceOption(_ + _).getOrElse(StringType.defaultSize)
+ val multiplier = totalColumnSize / selectedColumnSize
--- End diff --
@viirya Now it also support ORC. Please help to review
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org