You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/15 02:46:18 UTC

[GitHub] [hudi] YannByron commented on a change in pull request #4877: [HUDI-3457] Refactored Spark DataSource Relations to avoid code duplication

YannByron commented on a change in pull request #4877:
URL: https://github.com/apache/hudi/pull/4877#discussion_r826534174



##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -130,22 +158,110 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
   override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    // NOTE: In case list of requested columns doesn't contain the Primary Key one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
+    val (requiredAvroSchema, requiredStructSchema) =
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
+
+    val filterExpressions = convertToExpressions(filters)
+    val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
+
+    val fileSplits = collectFileSplits(partitionFilters, dataFilters)
+
+    val partitionSchema = StructType(Nil)
+    val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString)
+
     // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
     // Please check [[needConversion]] scala-doc for more details
-    doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
+    composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, filters).asInstanceOf[RDD[Row]]
   }
 
-  protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow]
+  // TODO scala-doc
+  protected def composeRDD(fileSplits: Seq[FileSplit],
+                           partitionSchema: StructType,
+                           tableSchema: HoodieTableSchema,
+                           requiredSchema: HoodieTableSchema,
+                           filters: Array[Filter]): HoodieUnsafeRDD
+
+  // TODO scala-doc
+  protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit]
+
+  protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
+    if (globPaths.isEmpty) {
+      val partitionDirs = fileIndex.listFiles(partitionFilters, dataFilters)
+      partitionDirs.map(pd => (getPartitionPath(pd.files.head), pd.files)).toMap
+    } else {
+      val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sparkSession, globPaths)
+      val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
+
+      val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
+      val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)
+
+      latestBaseFiles.groupBy(getPartitionPath)
+    }
+  }
+
+  protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = {
+    val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
+
+    val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty }
+    if (failedExprs.nonEmpty) {
+      val failedFilters = failedExprs.map(p => filters(p._2))
+      logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})")
+    }
+
+    catalystExpressions.filter(_.isDefined).map(_.get).toArray
+  }
+
+  /**
+   * Checks whether given expression only references partition columns
+   * (and involves no sub-query)
+   */
+  protected def isPartitionPredicate(condition: Expression): Boolean = {
+    // Validates that the provided names both resolve to the same entity
+    val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
+
+    condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
+      !SubqueryExpression.hasSubquery(condition)
+  }
 
   protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
     val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
     requestedColumns ++ missing
   }
+
+  private def getPrecombineFieldProperty: Option[String] =
+    Option(tableConfig.getPreCombineField)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match {
+      // NOTE: This is required to compensate for cases when empty string is used to stub
+      //       property value to avoid it being set with the default value
+      // TODO(HUDI-3456) cleanup
+      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
+      case _ => None
+    }
+
+  private def imbueConfigs(sqlContext: SQLContext): Unit = {
+    sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
+    sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true")
+    sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true")
+  }

Review comment:
       yep. `enableVectorizedReader` was false. but as discussed with @alexeykudinkin before, we need to enable this to speed up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org