You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/28 05:21:40 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #6213: [HUDI-4081][HUDI-4472] Addressing Spark SQL vs Spark DS performance gap

nsivabalan commented on code in PR #6213:
URL: https://github.com/apache/hudi/pull/6213#discussion_r931737037


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -241,39 +240,49 @@ object HoodieSparkSqlWriter {
             sparkContext.getConf.registerKryoClasses(
               Array(classOf[org.apache.avro.generic.GenericData],
                 classOf[org.apache.avro.Schema]))
-            var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-            val lastestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema)
+
+            // TODO(HUDI-4472) revisit and simplify schema handling
+            val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
+            val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema)
+
+            val enabledSchemaEvolution = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean

Review Comment:
   should we use DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue instead of explicit "false" ? 



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -66,100 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {
    * @param extraOptions Extra options for insert.
    */
   def run(sparkSession: SparkSession,
-      table: CatalogTable,
-      query: LogicalPlan,
-      insertPartitions: Map[String, Option[String]],
-      overwrite: Boolean,
-      refreshTable: Boolean = true,
-      extraOptions: Map[String, String] = Map.empty): Boolean = {
-
-    val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
-    val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, overwrite, insertPartitions, extraOptions)
-
-    val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
-      // insert overwrite non-partition table
+          table: CatalogTable,
+          query: LogicalPlan,
+          partitionSpec: Map[String, Option[String]],
+          overwrite: Boolean,
+          refreshTable: Boolean = true,
+          extraOptions: Map[String, String] = Map.empty): Boolean = {
+    val catalogTable = new HoodieCatalogTable(sparkSession, table)
+    val config = buildHoodieInsertConfig(catalogTable, sparkSession, overwrite, partitionSpec, extraOptions)
+
+    // NOTE: In case of partitioned table we override specified "overwrite" parameter
+    //       to instead append to the dataset
+    val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
       SaveMode.Overwrite
     } else {
-      // for insert into or insert overwrite partition we use append mode.
       SaveMode.Append
     }
-    val conf = sparkSession.sessionState.conf
-    val alignedQuery = alignOutputFields(query, hoodieCatalogTable, insertPartitions, conf)
-    // If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
-    // The nullable attribute of fields will lost.
-    // In order to pass the nullable attribute to the inputDF, we specify the schema
-    // of the rdd.
-    val inputDF = sparkSession.createDataFrame(
-      Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
-    val success =
-      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, inputDF)._1
-    if (success) {
-      if (refreshTable) {
-        sparkSession.catalog.refreshTable(table.identifier.unquotedString)
-      }
-      true
-    } else {
-      false
+
+    val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf)
+
+    val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery))
+
+    if (success && refreshTable) {
+      sparkSession.catalog.refreshTable(table.identifier.unquotedString)
     }
+
+    success
   }
 
   /**
-   * Aligned the type and name of query's output fields with the result table's fields.
-   * @param query The insert query which to aligned.
-   * @param hoodieCatalogTable The result hoodie catalog table.
-   * @param insertPartitions The insert partition map.
-   * @param conf The SQLConf.
-   * @return
+   * Align provided [[query]]'s output with the expected [[catalogTable]] schema by
+   *
+   * <ul>
+   *   <li>Performing type coercion (casting corresponding outputs, where needed)</li>
+   *   <li>Adding aliases (matching column names) to corresponding outputs </li>
+   * </ul>
+   *
+   * @param query target query whose output is to be inserted
+   * @param catalogTable catalog table
+   * @param partitionsSpec partition spec specifying static/dynamic partition values
+   * @param conf Spark's [[SQLConf]]
    */
-  private def alignOutputFields(
-    query: LogicalPlan,
-    hoodieCatalogTable: HoodieCatalogTable,
-    insertPartitions: Map[String, Option[String]],
-    conf: SQLConf): LogicalPlan = {
-
-    val targetPartitionSchema = hoodieCatalogTable.partitionSchema
-
-    val staticPartitionValues = insertPartitions.filter(p => p._2.isDefined).mapValues(_.get)
-    assert(staticPartitionValues.isEmpty ||
-      insertPartitions.size == targetPartitionSchema.size,
-      s"Required partition columns is: ${targetPartitionSchema.json}, Current input partitions " +
-        s"is: ${staticPartitionValues.mkString("," + "")}")
-
-    val queryOutputWithoutMetaFields = removeMetaFields(query.output)
-    assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
-      == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
-      s"Required select columns count: ${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
-        s"Current select columns(including static partition column) count: " +
-        s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size},columns: " +
-        s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
-
-    val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
-      hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)))
-    val dataProjectsWithoutMetaFields = getTableFieldsAlias(queryOutputWithoutMetaFields,
-      dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
-
-    val partitionProjects = targetPartitionSchema.fields.filter(f => staticPartitionValues.contains(f.name))
-      .map(f => {
-        val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
-          s"Missing static partition value for: ${f.name}")
-        val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf)
-        Alias(castAttr, f.name)()
+  private def alignQueryOutput(query: LogicalPlan,
+                               catalogTable: HoodieCatalogTable,
+                               partitionsSpec: Map[String, Option[String]],
+                               conf: SQLConf): LogicalPlan = {
+
+    val targetPartitionSchema = catalogTable.partitionSchema
+    val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+
+    validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
+    // Make sure we strip out meta-fields from the incoming dataset (these will have to be discarded anyway)
+    val cleanedQuery = stripMetaFields(query)
+    // To validate and align properly output of the query, we simply filter out partition columns with already
+    // provided static values from the table's schema
+    //
+    // NOTE: This is a crucial step: since coercion might rely on either of a) name-based or b) positional-based
+    //       matching it's important to strip out partition columns, having static values provided in the partition spec,
+    //       since such columns wouldn't be otherwise specified w/in the query itself and therefore couldn't be matched
+    //       positionally for example
+    val expectedQueryColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name))
+    val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, catalogTable, conf)
+
+    val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf)
+
+    Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, coercedQueryOutput)
+  }
+
+  private def coerceQueryOutputColumns(expectedSchema: StructType,
+                                       query: LogicalPlan,
+                                       catalogTable: HoodieCatalogTable,
+                                       conf: SQLConf): LogicalPlan = {
+    val planUtils = sparkAdapter.getCatalystPlanUtils
+    try {
+      planUtils.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = true, conf)
+    } catch {
+      // NOTE: In case matching by name didn't match the query output, we will attempt positional matching
+      case ae: AnalysisException if ae.getMessage().startsWith("Cannot write incompatible data to table") =>
+        planUtils.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = false, conf)
+    }

Review Comment:
   do we throw here if some other exception other than `AnalysisException` is thrown ? looks like we swallow silently. can we fix that? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -241,39 +240,49 @@ object HoodieSparkSqlWriter {
             sparkContext.getConf.registerKryoClasses(
               Array(classOf[org.apache.avro.generic.GenericData],
                 classOf[org.apache.avro.Schema]))
-            var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-            val lastestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema)
+
+            // TODO(HUDI-4472) revisit and simplify schema handling
+            val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
+            val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema)
+
+            val enabledSchemaEvolution = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean

Review Comment:
   minor. may be "schemaEvolutionEnabled" would be better compared to "enabledSchemaEvolution".



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -241,39 +240,49 @@ object HoodieSparkSqlWriter {
             sparkContext.getConf.registerKryoClasses(
               Array(classOf[org.apache.avro.generic.GenericData],
                 classOf[org.apache.avro.Schema]))
-            var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-            val lastestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema)
+
+            // TODO(HUDI-4472) revisit and simplify schema handling
+            val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
+            val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema)
+
+            val enabledSchemaEvolution = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
             var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
-            if (reconcileSchema && parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
-              && internalSchemaOpt.isEmpty) {
-              // force apply full schema evolution.
-              internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema))
-            }
-            if (reconcileSchema) {
-              schema = lastestSchema
-            }
-            if (internalSchemaOpt.isDefined) {
-              // Apply schema evolution.
-              val mergedSparkSchema = if (!reconcileSchema) {
-                AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, lastestSchema))
+
+            val writerSchema: Schema =
+              if (reconcileSchema) {
+                // In case we need to reconcile the schema and schema evolution is enabled,
+                // we will force-apply schema evolution to the writer's schema
+                if (enabledSchemaEvolution && internalSchemaOpt.isEmpty) {
+                  internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema))
+                }
+
+                if (internalSchemaOpt.isDefined) {
+                  // Apply schema evolution, by auto-merging write schema and read schema
+                  val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
+                  AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getName)
+                } else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {

Review Comment:
   so essentially, this schema reconciliation is not handled well in schema evolution code path ? may be can you file a tracking ticket.



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -66,100 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {
    * @param extraOptions Extra options for insert.
    */
   def run(sparkSession: SparkSession,
-      table: CatalogTable,
-      query: LogicalPlan,
-      insertPartitions: Map[String, Option[String]],
-      overwrite: Boolean,
-      refreshTable: Boolean = true,
-      extraOptions: Map[String, String] = Map.empty): Boolean = {
-
-    val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
-    val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, overwrite, insertPartitions, extraOptions)
-
-    val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
-      // insert overwrite non-partition table
+          table: CatalogTable,
+          query: LogicalPlan,
+          partitionSpec: Map[String, Option[String]],
+          overwrite: Boolean,
+          refreshTable: Boolean = true,
+          extraOptions: Map[String, String] = Map.empty): Boolean = {
+    val catalogTable = new HoodieCatalogTable(sparkSession, table)
+    val config = buildHoodieInsertConfig(catalogTable, sparkSession, overwrite, partitionSpec, extraOptions)
+
+    // NOTE: In case of partitioned table we override specified "overwrite" parameter
+    //       to instead append to the dataset
+    val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
       SaveMode.Overwrite
     } else {
-      // for insert into or insert overwrite partition we use append mode.
       SaveMode.Append
     }
-    val conf = sparkSession.sessionState.conf
-    val alignedQuery = alignOutputFields(query, hoodieCatalogTable, insertPartitions, conf)
-    // If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
-    // The nullable attribute of fields will lost.
-    // In order to pass the nullable attribute to the inputDF, we specify the schema
-    // of the rdd.
-    val inputDF = sparkSession.createDataFrame(
-      Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
-    val success =
-      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, inputDF)._1
-    if (success) {
-      if (refreshTable) {
-        sparkSession.catalog.refreshTable(table.identifier.unquotedString)
-      }
-      true
-    } else {
-      false
+
+    val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf)
+
+    val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery))
+
+    if (success && refreshTable) {
+      sparkSession.catalog.refreshTable(table.identifier.unquotedString)
     }
+
+    success
   }
 
   /**
-   * Aligned the type and name of query's output fields with the result table's fields.
-   * @param query The insert query which to aligned.
-   * @param hoodieCatalogTable The result hoodie catalog table.
-   * @param insertPartitions The insert partition map.
-   * @param conf The SQLConf.
-   * @return
+   * Align provided [[query]]'s output with the expected [[catalogTable]] schema by
+   *
+   * <ul>
+   *   <li>Performing type coercion (casting corresponding outputs, where needed)</li>
+   *   <li>Adding aliases (matching column names) to corresponding outputs </li>
+   * </ul>
+   *
+   * @param query target query whose output is to be inserted
+   * @param catalogTable catalog table
+   * @param partitionsSpec partition spec specifying static/dynamic partition values
+   * @param conf Spark's [[SQLConf]]
    */
-  private def alignOutputFields(
-    query: LogicalPlan,
-    hoodieCatalogTable: HoodieCatalogTable,
-    insertPartitions: Map[String, Option[String]],
-    conf: SQLConf): LogicalPlan = {
-
-    val targetPartitionSchema = hoodieCatalogTable.partitionSchema
-
-    val staticPartitionValues = insertPartitions.filter(p => p._2.isDefined).mapValues(_.get)
-    assert(staticPartitionValues.isEmpty ||
-      insertPartitions.size == targetPartitionSchema.size,
-      s"Required partition columns is: ${targetPartitionSchema.json}, Current input partitions " +
-        s"is: ${staticPartitionValues.mkString("," + "")}")
-
-    val queryOutputWithoutMetaFields = removeMetaFields(query.output)
-    assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
-      == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
-      s"Required select columns count: ${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
-        s"Current select columns(including static partition column) count: " +
-        s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size},columns: " +
-        s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
-
-    val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
-      hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)))
-    val dataProjectsWithoutMetaFields = getTableFieldsAlias(queryOutputWithoutMetaFields,
-      dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
-
-    val partitionProjects = targetPartitionSchema.fields.filter(f => staticPartitionValues.contains(f.name))
-      .map(f => {
-        val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
-          s"Missing static partition value for: ${f.name}")
-        val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf)
-        Alias(castAttr, f.name)()
+  private def alignQueryOutput(query: LogicalPlan,
+                               catalogTable: HoodieCatalogTable,
+                               partitionsSpec: Map[String, Option[String]],
+                               conf: SQLConf): LogicalPlan = {
+
+    val targetPartitionSchema = catalogTable.partitionSchema
+    val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+
+    validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
+    // Make sure we strip out meta-fields from the incoming dataset (these will have to be discarded anyway)
+    val cleanedQuery = stripMetaFields(query)
+    // To validate and align properly output of the query, we simply filter out partition columns with already
+    // provided static values from the table's schema
+    //
+    // NOTE: This is a crucial step: since coercion might rely on either of a) name-based or b) positional-based
+    //       matching it's important to strip out partition columns, having static values provided in the partition spec,
+    //       since such columns wouldn't be otherwise specified w/in the query itself and therefore couldn't be matched
+    //       positionally for example
+    val expectedQueryColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name))
+    val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, catalogTable, conf)
+
+    val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf)
+
+    Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, coercedQueryOutput)
+  }
+
+  private def coerceQueryOutputColumns(expectedSchema: StructType,
+                                       query: LogicalPlan,
+                                       catalogTable: HoodieCatalogTable,
+                                       conf: SQLConf): LogicalPlan = {
+    val planUtils = sparkAdapter.getCatalystPlanUtils
+    try {
+      planUtils.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = true, conf)
+    } catch {
+      // NOTE: In case matching by name didn't match the query output, we will attempt positional matching
+      case ae: AnalysisException if ae.getMessage().startsWith("Cannot write incompatible data to table") =>
+        planUtils.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = false, conf)
+    }
+  }
+
+  private def validate(queryOutputSchema: StructType, partitionsSpec: Map[String, Option[String]], catalogTable: HoodieCatalogTable): Unit = {
+    // Validate that partition-spec has proper format (it could be empty if all of the partition values are dynamic,
+    // ie there are no static partition-values specified)
+    if (partitionsSpec.nonEmpty && partitionsSpec.size != catalogTable.partitionSchema.size) {
+      throw new HoodieException(s"Required partition schema is: ${catalogTable.partitionSchema.fieldNames.mkString("[", ", ", "]")}, " +
+        s"partition spec is: ${partitionsSpec.mkString("[", ", ", "]")}")
+    }
+
+    val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+    val fullQueryOutputSchema = StructType(queryOutputSchema.fields ++ staticPartitionValues.keys.map(StructField(_, StringType)))
+
+    // Assert that query provides all the required columns
+    if (!conforms(fullQueryOutputSchema, catalogTable.tableSchemaWithoutMetaFields)) {
+      throw new HoodieException(s"Expected table's schema: ${catalogTable.tableSchemaWithoutMetaFields.fields.mkString("[", ", ", "]")}, " +
+        s"query's output (including static partition values): ${fullQueryOutputSchema.fields.mkString("[", ", ", "]")}")
+    }
+  }
+
+  private def createStaticPartitionValuesExpressions(staticPartitionValues: Map[String, String],
+                                                     partitionSchema: StructType,
+                                                     conf: SQLConf): Seq[NamedExpression] = {
+    partitionSchema.fields
+      .filter(pf => staticPartitionValues.contains(pf.name))
+      .map(pf => {
+        val staticPartitionValue = staticPartitionValues(pf.name)
+        val castExpr = castIfNeeded(Literal.create(staticPartitionValue), pf.dataType, conf)
+
+        Alias(castExpr, pf.name)()
       })
+  }
 
-    Project(dataProjectsWithoutMetaFields ++ partitionProjects, query)
+  private def conforms(sourceSchema: StructType, targetSchema: StructType): Boolean = {
+    if (sourceSchema.fields.length != targetSchema.fields.length) {
+      false
+    } else {
+      targetSchema.fields.zip(sourceSchema).forall {
+        case (targetColumn, sourceColumn) =>
+          // Make sure we can cast source column to the target column type
+          Cast.canCast(sourceColumn.dataType, targetColumn.dataType)

Review Comment:
   just for my understanding. This validation did not exist before this patch right and we are adding it newly ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org