You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/19 00:34:38 UTC

[GitHub] [hudi] XuQianJin-Stars commented on a change in pull request #4877: [HUDI-3457] Refactored Spark DataSource Relations to avoid code duplication

XuQianJin-Stars commented on a change in pull request #4877:
URL: https://github.com/apache/hudi/pull/4877#discussion_r830415829



##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -130,22 +159,129 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
   override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    // NOTE: In case list of requested columns doesn't contain the Primary Key one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
+    val (requiredAvroSchema, requiredStructSchema) =
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
+
+    val filterExpressions = convertToExpressions(filters)
+    val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
+
+    val fileSplits = collectFileSplits(partitionFilters, dataFilters)
+
+    val partitionSchema = StructType(Nil)
+    val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString)
+
     // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
     // Please check [[needConversion]] scala-doc for more details
-    doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
+    if (fileSplits.nonEmpty)
+      composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, filters).asInstanceOf[RDD[Row]]
+    else
+      sparkSession.sparkContext.emptyRDD
+  }
+
+  /**
+   * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
+   *
+   * @param fileSplits      file splits to be handled by the RDD
+   * @param partitionSchema target table's partition schema
+   * @param tableSchema     target table's schema
+   * @param requiredSchema  projected schema required by the reader
+   * @param filters         data filters to be applied
+   * @return instance of RDD (implementing [[HoodieUnsafeRDD]])
+   */
+  protected def composeRDD(fileSplits: Seq[FileSplit],
+                           partitionSchema: StructType,
+                           tableSchema: HoodieTableSchema,
+                           requiredSchema: HoodieTableSchema,
+                           filters: Array[Filter]): HoodieUnsafeRDD
+
+  /**
+   * Provided with partition and date filters collects target file splits to read records from, while
+   * performing pruning if necessary
+   *
+   * @param partitionFilters partition filters to be applied
+   * @param dataFilters data filters to be applied
+   * @return list of [[FileSplit]] to fetch records from
+   */
+  protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit]
+
+  protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
+    val partitionDirs = if (globbedPaths.isEmpty) {
+      fileIndex.listFiles(partitionFilters, dataFilters)
+    } else {
+      val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths)
+      inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
+    }
+
+    val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
+    val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)
+
+    latestBaseFiles.groupBy(getPartitionPath)
   }
 
-  protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow]
+  protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = {
+    val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
+
+    val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty }
+    if (failedExprs.nonEmpty) {
+      val failedFilters = failedExprs.map(p => filters(p._2))
+      logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})")
+    }
+
+    catalystExpressions.filter(_.isDefined).map(_.get).toArray
+  }
+
+  /**
+   * Checks whether given expression only references partition columns
+   * (and involves no sub-query)
+   */
+  protected def isPartitionPredicate(condition: Expression): Boolean = {
+    // Validates that the provided names both resolve to the same entity
+    val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
+
+    condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
+      !SubqueryExpression.hasSubquery(condition)
+  }
 
   protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
     val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
     requestedColumns ++ missing
   }
+
+  private def getPrecombineFieldProperty: Option[String] =
+    Option(tableConfig.getPreCombineField)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match {
+      // NOTE: This is required to compensate for cases when empty string is used to stub
+      //       property value to avoid it being set with the default value
+      // TODO(HUDI-3456) cleanup
+      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
+      case _ => None
+    }
+
+  private def imbueConfigs(sqlContext: SQLContext): Unit = {
+    sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
+    sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true")
+    // TODO(HUDI-3639) vectorized reader has to be disabled to make sure MORIncrementalRelation is working properly
+    sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
+  }

Review comment:
       `spark.sql.parquet.enableVectorizedReader = true` to enable vectorization acceleration ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org