You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/06/06 22:10:23 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5737: [HUDI-4178] Addressing performance regressions in Spark DataSourceV2 Integration

alexeykudinkin commented on code in PR #5737:
URL: https://github.com/apache/hudi/pull/5737#discussion_r890608055


##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommandSpark32.scala:
##########
@@ -33,33 +32,37 @@ import org.apache.spark.sql.hudi.command.{AlterTableCommand => HudiAlterTableCom
   */
 class ResolveHudiAlterTableCommandSpark32(sparkSession: SparkSession) extends Rule[LogicalPlan] {
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
-    case set @ SetTableProperties(asTable(table), _) if schemaEvolutionEnabled && set.resolved =>
-      HudiAlterTableCommand(table, set.changes, ColumnChangeID.PROPERTY_CHANGE)
-    case unSet @ UnsetTableProperties(asTable(table), _, _) if schemaEvolutionEnabled && unSet.resolved =>
-      HudiAlterTableCommand(table, unSet.changes, ColumnChangeID.PROPERTY_CHANGE)
-    case drop @ DropColumns(asTable(table), _) if schemaEvolutionEnabled && drop.resolved =>
-      HudiAlterTableCommand(table, drop.changes, ColumnChangeID.DELETE)
-    case add @ AddColumns(asTable(table), _) if schemaEvolutionEnabled  && add.resolved =>
-      HudiAlterTableCommand(table, add.changes, ColumnChangeID.ADD)
-    case renameColumn @ RenameColumn(asTable(table), _, _) if schemaEvolutionEnabled && renameColumn.resolved=>
-      HudiAlterTableCommand(table, renameColumn.changes, ColumnChangeID.UPDATE)
-    case alter @ AlterColumn(asTable(table), _, _, _, _, _) if schemaEvolutionEnabled && alter.resolved =>
-      HudiAlterTableCommand(table, alter.changes, ColumnChangeID.UPDATE)
-    case replace @ ReplaceColumns(asTable(table), _) if schemaEvolutionEnabled && replace.resolved =>
-      HudiAlterTableCommand(table, replace.changes, ColumnChangeID.REPLACE)
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    if (schemaEvolutionEnabled) {

Review Comment:
   Only change here is bubbling up conditional at the top (there were other changes, but they were rolled back)



##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala:
##########
@@ -153,56 +172,53 @@ class HoodieCatalog extends DelegatingCatalogExtension
   @throws[TableAlreadyExistsException]
   override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
     loadTable(oldIdent) match {
-      case _: HoodieInternalV2Table =>
+      case HoodieV1OrV2Table(_) =>
         AlterHoodieTableRenameCommand(oldIdent.asTableIdentifier, newIdent.asTableIdentifier, false).run(spark)
       case _ => super.renameTable(oldIdent, newIdent)
     }
   }
 
   override def alterTable(ident: Identifier, changes: TableChange*): Table = {
-    val tableIdent = TableIdentifier(ident.name(), ident.namespace().lastOption)
-    // scalastyle:off
-    val table = loadTable(ident) match {
-      case hoodieTable: HoodieInternalV2Table => hoodieTable
-      case _ => return super.alterTable(ident, changes: _*)
-    }
-    // scalastyle:on
-
-    val grouped = changes.groupBy(c => c.getClass)
-
-    grouped.foreach {
-      case (t, newColumns) if t == classOf[AddColumn] =>
-        AlterHoodieTableAddColumnsCommand(
-          tableIdent,
-          newColumns.asInstanceOf[Seq[AddColumn]].map { col =>
-            StructField(
-              col.fieldNames()(0),
-              col.dataType(),
-              col.isNullable)
-          }).run(spark)
-      case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) =>
-        columnChanges.foreach {
-          case dataType: UpdateColumnType =>
-            val colName = UnresolvedAttribute(dataType.fieldNames()).name
-            val newDataType = dataType.newDataType()
-            val structField = StructField(colName, newDataType)
-            AlterHoodieTableChangeColumnCommand(tableIdent, colName, structField).run(spark)
-          case dataType: UpdateColumnComment =>
-            val newComment = dataType.newComment()
-            val colName = UnresolvedAttribute(dataType.fieldNames()).name
-            val fieldOpt = table.schema().findNestedField(dataType.fieldNames(), includeCollections = true,
-              spark.sessionState.conf.resolver).map(_._2)
-            val field = fieldOpt.getOrElse {
-              throw new AnalysisException(
-                s"Couldn't find column $colName in:\n${table.schema().treeString}")
+    loadTable(ident) match {

Review Comment:
   No changes to this code, just fixing the linter



##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala:
##########
@@ -105,12 +106,30 @@ class HoodieCatalog extends DelegatingCatalogExtension
           case _ =>
             catalogTable0
         }
-        HoodieInternalV2Table(
+
+        val v2Table = HoodieInternalV2Table(
           spark = spark,
           path = catalogTable.location.toString,
           catalogTable = Some(catalogTable),
           tableIdentifier = Some(ident.toString))
-      case o => o
+
+        val schemaEvolutionEnabled: Boolean = spark.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+          DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
+
+        // NOTE: PLEASE READ CAREFULLY
+        //
+        // Since Hudi relations don't currently implement DS V2 Read API, we by default fallback to V1 here.
+        // Such fallback will have considerable performance impact, therefore it's only performed in cases
+        // where V2 API have to be used. Currently only such use-case is using of Schema Evolution feature
+        //
+        // Check out HUDI-4178 for more details
+        if (schemaEvolutionEnabled) {
+          v2Table
+        } else {
+          v2Table.v1TableWrapper

Review Comment:
   Correct. This and `Spark3DefaultSource`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org