You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "alexeykudinkin (via GitHub)" <gi...@apache.org> on 2023/02/06 20:53:55 UTC

[GitHub] [hudi] alexeykudinkin opened a new pull request, #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

alexeykudinkin opened a new pull request, #7871:
URL: https://github.com/apache/hudi/pull/7871

   ### Change Logs
   
   NOTE: This is a squashed version of the original PR #6361
   
   This PR cleans up considerable amount of Spark's (internal) resolution logic that has been copied over into Hudi components, while in reality there's no actual need for that -- instead we can rely on Spark itself to resolve most of these
   constructs and instead plugin our custom implementation (post-hoc) after resolution has been performed.
   
   Issues this will be addressing (among others): 
    - HUDI-4503 
    - HUDI-4690
    - HUDI-4872
    - HUDI-4861
   
   
   Changelog:
    - 
   
   ### Impact
   
   Resolves many Spark SQL issues stemming from the inconsistent implementation of Hudi's bespoke resolution rules.
   
   ### Risk level (write none, low medium or high below)
   
   _If medium or high, explain what verification was done to mitigate the risks._
   
   ### Documentation Update
   
   _Describe any necessary documentation update if there is any new feature, config, or user-facing change_
   
   - _The config description must be updated if new configs are added or the default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
     ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make
     changes to the website._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1104995855


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -28,97 +28,125 @@ import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TB
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.hudi.util.JFunction.scalaFunction1Noop
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport}
-import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, attributeEquals}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, EqualTo, Expression, Literal, NamedExpression, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
-import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
-import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
+import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference, encodeAsBase64String, stripCasting, toStructType}
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
-import org.apache.spark.sql.types.{BooleanType, StructType}
+import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
 
 import java.util.Base64
 
 /**
- * The Command for hoodie MergeIntoTable.
- * The match on condition must contain the row key fields currently, so that we can use Hoodie
- * Index to speed up the performance.
+ * Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
  *
- * The main algorithm:
+ * NOTE: That this implementation is restricted in a some aspects to accommodate for Hudi's crucial
+ *       constraint (of requiring every record to bear unique primary-key): merging condition ([[mergeCondition]])
+ *       is currently can only (and must) reference target table's primary-key columns (this is necessary to
+ *       leverage Hudi's upserting capabilities including Indexes)
  *
- * We pushed down all the matched and not matched (condition, assignment) expression pairs to the
- * ExpressionPayload. And the matched (condition, assignment) expression pairs will execute in the
- * ExpressionPayload#combineAndGetUpdateValue to compute the result record, while the not matched
- * expression pairs will execute in the ExpressionPayload#getInsertValue.
+ * Following algorithm is applied:
  *
- * For Mor table, it is a litter complex than this. The matched record also goes through the getInsertValue
- * and write append to the log. So the update actions & insert actions should process by the same
- * way. We pushed all the update actions & insert actions together to the
- * ExpressionPayload#getInsertValue.
+ * <ol>
+ *   <li>Incoming batch ([[sourceTable]]) is reshaped such that it bears correspondingly:
+ *   a) (required) "primary-key" column as well as b) (optional) "pre-combine" column; this is
+ *   required since MIT statements does not restrict [[sourceTable]]s schema to be aligned w/ the
+ *   [[targetTable]]s one, while Hudi's upserting flow expects such columns to be present</li>

Review Comment:
   MIT poses no restrictions on your source and target tables' schemas to be affined in any way, in other words anything can be merged into the target table as far as the fields we're updating do have matching data-types.
   
   This is actually exactly the reason why schema validation is disabled for MIT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1104996074


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -127,164 +155,189 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
     // target table side (since we're gonna be matching against primary-key column as is) expression
     // on the opposite side of the comparison should be cast-able to the primary-key column's data-type
     // t/h "up-cast" (ie w/o any loss in precision)
-    val target2Source = cleanedConditions.map {
-      case EqualTo(CoercedAttributeReference(attr), expr)
-        if targetAttrs.exists(f => attributeEqual(f, attr, resolver)) =>
-          if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
-            targetAttrs.find(f => resolver(f.name, attr.name)).get.name ->
-              castIfNeeded(expr, attr.dataType, sparkSession.sqlContext.conf)
-          } else {
-            throw new AnalysisException(s"Invalid MERGE INTO matching condition: ${expr.sql}: "
-              + s"can't cast ${expr.sql} (of ${expr.dataType}) to ${attr.dataType}")
-          }
+    val targetAttr2ConditionExpressions = cleanedConditions.map {
+      case EqualTo(CoercedAttributeReference(attr), expr) if targetAttrs.exists(f => attributeEquals(f, attr)) =>
+        if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
+          // NOTE: It's critical we reference output attribute here and not the one from condition
+          val targetAttr = targetAttrs.find(f => attributeEquals(f, attr)).get
+          targetAttr -> castIfNeeded(expr, attr.dataType)
+        } else {
+          throw new AnalysisException(s"Invalid MERGE INTO matching condition: ${expr.sql}: "
+            + s"can't cast ${expr.sql} (of ${expr.dataType}) to ${attr.dataType}")
+        }
 
-      case EqualTo(expr, CoercedAttributeReference(attr))
-        if targetAttrs.exists(f => attributeEqual(f, attr, resolver)) =>
-          if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
-            targetAttrs.find(f => resolver(f.name, attr.name)).get.name ->
-              castIfNeeded(expr, attr.dataType, sparkSession.sqlContext.conf)
-          } else {
-            throw new AnalysisException(s"Invalid MERGE INTO matching condition: ${expr.sql}: "
-              + s"can't cast ${expr.sql} (of ${expr.dataType}) to ${attr.dataType}")
-          }
+      case EqualTo(expr, CoercedAttributeReference(attr)) if targetAttrs.exists(f => attributeEquals(f, attr)) =>
+        if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
+          // NOTE: It's critical we reference output attribute here and not the one from condition
+          val targetAttr = targetAttrs.find(f => attributeEquals(f, attr)).get
+          targetAttr -> castIfNeeded(expr, attr.dataType)
+        } else {
+          throw new AnalysisException(s"Invalid MERGE INTO matching condition: ${expr.sql}: "
+            + s"can't cast ${expr.sql} (of ${expr.dataType}) to ${attr.dataType}")
+        }
 
       case expr =>
         throw new AnalysisException(s"Invalid MERGE INTO matching condition: `${expr.sql}`: "
           + "expected condition should be 'target.id = <source-column-expr>', e.g. "
           + "`t.id = s.id` or `t.id = cast(s.id, ...)`")
-    }.toMap
+    }
 
-    target2Source
+    targetAttr2ConditionExpressions.collect {
+      case (attr, expr) if resolver(attr.name, primaryKeyField) =>
+        // NOTE: Here we validate that condition expression involving primary-key column(s) is a simple
+        //       attribute-reference expression (possibly wrapped into a cast). This is necessary to disallow
+        //       statements like following
+        //
+        //         MERGE INTO ... AS t USING (
+        //            SELECT ... FROM ... AS s
+        //         )
+        //            ON t.id = s.id + 1
+        //            WHEN MATCHED THEN UPDATE *
+        //
+        //       Which (in the current design) could result in a primary key of the record being modified,
+        //       which is not allowed.
+        if (!resolvesToSourceAttribute(expr)) {
+          throw new AnalysisException("Only simple conditions of the form `t.id = s.id` are allowed on the " +
+            s"primary-key column. Found `${attr.sql} = ${expr.sql}`")
+        }
+
+        (attr, expr)
+    }
   }
 
   /**
-   * Get the mapping of target preCombineField to the source expression.
+   * Please check description for [[primaryKeyAttributeToConditionExpression]]
    */
-  private lazy val target2SourcePreCombineFiled: Option[(String, Expression)] = {
-    val updateActions = mergeInto.matchedActions.collect { case u: UpdateAction => u }
-    assert(updateActions.size <= 1, s"Only support one updateAction currently, current update action count is: ${updateActions.size}")
-
-    val updateAction = updateActions.headOption
-    hoodieCatalogTable.preCombineKey.map(preCombineField => {
-      val sourcePreCombineField =
-        updateAction.map(u => u.assignments.filter {
-            case Assignment(key: AttributeReference, _) => key.name.equalsIgnoreCase(preCombineField)
-            case _=> false
-          }.head.value
-        ).getOrElse {
-          // If there is no update action, mapping the target column to the source by order.
-          val target2Source = mergeInto.targetTable.output
-            .filter(attr => !isMetaField(attr.name))
-            .map(_.name)
-            .zip(mergeInto.sourceTable.output.filter(attr => !isMetaField(attr.name)))
-            .toMap
-          target2Source.getOrElse(preCombineField, null)
+  private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute, Expression)] = {
+    val resolver = sparkSession.sessionState.analyzer.resolver
+    hoodieCatalogTable.preCombineKey.map { preCombineField =>
+      val targetPreCombineAttribute =
+        mergeInto.targetTable.output
+          .find { attr => resolver(attr.name, preCombineField) }
+          .get
+
+      // To find corresponding "pre-combine" attribute w/in the [[sourceTable]] we do
+      //    - Check if we can resolve the attribute w/in the source table as is; if unsuccessful, then
+      //    - Check if in any of the update actions, right-hand side of the assignment actually resolves
+      //    to it, in which case we will determine left-hand side expression as the value of "pre-combine"
+      //    attribute w/in the [[sourceTable]]
+      val sourceExpr = {
+        mergeInto.sourceTable.output.find(attr => resolver(attr.name, preCombineField)) match {
+          case Some(attr) => attr
+          case None =>
+            updatingActions.flatMap(_.assignments).collectFirst {
+              case Assignment(attr: AttributeReference, expr)
+                if resolver(attr.name, preCombineField) && resolvesToSourceAttribute(expr) => expr
+            } getOrElse {
+              throw new AnalysisException(s"Failed to resolve pre-combine field `${preCombineField}` w/in the source-table output")
+            }
+
         }
-      (preCombineField, sourcePreCombineField)
-    }).filter(p => p._2 != null)
+      }
+
+      (targetPreCombineAttribute, sourceExpr)
+    }
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     this.sparkSession = sparkSession
+    // TODO move to analysis phase
+    validate(mergeInto)

Review Comment:
   Nope. This is a future reference



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7871:
URL: https://github.com/apache/hudi/pull/7871#issuecomment-1441302450

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "be8c865ae5586420a0b171667f390110808e49dc",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969",
       "triggerID" : "be8c865ae5586420a0b171667f390110808e49dc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14970",
       "triggerID" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9763c0f781d6166c16f9752b9c7447429bd5cf84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15354",
       "triggerID" : "9763c0f781d6166c16f9752b9c7447429bd5cf84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 286533b5f8edf1d85a49fcf6449fb11d4f4085d6 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14970) 
   * 9763c0f781d6166c16f9752b9c7447429bd5cf84 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15354) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7871:
URL: https://github.com/apache/hudi/pull/7871#issuecomment-1419862955

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "be8c865ae5586420a0b171667f390110808e49dc",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969",
       "triggerID" : "be8c865ae5586420a0b171667f390110808e49dc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14970",
       "triggerID" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * be8c865ae5586420a0b171667f390110808e49dc Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969) 
   * 286533b5f8edf1d85a49fcf6449fb11d4f4085d6 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14970) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7871:
URL: https://github.com/apache/hudi/pull/7871#issuecomment-1441485746

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "be8c865ae5586420a0b171667f390110808e49dc",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969",
       "triggerID" : "be8c865ae5586420a0b171667f390110808e49dc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14970",
       "triggerID" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9763c0f781d6166c16f9752b9c7447429bd5cf84",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15354",
       "triggerID" : "9763c0f781d6166c16f9752b9c7447429bd5cf84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9763c0f781d6166c16f9752b9c7447429bd5cf84 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15354) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1103803439


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -75,7 +81,7 @@ trait HoodieCatalystExpressionUtils {
   def unapplyCastExpression(expr: Expression): Option[(Expression, DataType, Option[String], Boolean)]
 }
 
-object HoodieCatalystExpressionUtils {
+object HoodieCatalystExpressionUtils extends SparkAdapterSupport {

Review Comment:
   Why does it need to extend `SparkAdapterSupport`? Is there something that changes across spark versions?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java:
##########
@@ -69,6 +69,26 @@ public static boolean nonEmpty(Collection<?> c) {
     return !isNullOrEmpty(c);
   }
 
+  /**
+   * Reduces provided {@link Collection} using provided {@code reducer} applied to
+   * every element of the collection like following
+   *
+   * {@code reduce(reduce(reduce(identity, e1), e2), ...)}
+   *
+   * @param c target collection to be reduced
+   * @param identity element for reducing to start from
+   * @param reducer actual reducing operator
+   *
+   * @return result of the reduction of the collection using reducing operator
+   */
+  public static <T, U> U reduce(Collection<T> c, U identity, BiFunction<U, T, U> reducer) {
+    return c.stream()
+        .sequential()

Review Comment:
   Does it have to be strictly sequential? I mean the elements of collection should be independent of each other. Is there any value add in parameterizing this behavior, say we add a boolean `shouldReduceParallelly`?



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChange.java:
##########
@@ -83,10 +83,16 @@ abstract class BaseColumnChange implements TableChange {
     protected final InternalSchema internalSchema;
     protected final Map<Integer, Integer> id2parent;
     protected final Map<Integer, ArrayList<ColumnPositionChange>> positionChangeMap = new HashMap<>();
+    protected final boolean caseSensitive;
 
     BaseColumnChange(InternalSchema schema) {
+      this(schema, false);

Review Comment:
   why default `caseSensitive` is false?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -28,97 +28,125 @@ import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TB
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.hudi.util.JFunction.scalaFunction1Noop
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport}
-import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, attributeEquals}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, EqualTo, Expression, Literal, NamedExpression, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
-import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
-import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
+import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference, encodeAsBase64String, stripCasting, toStructType}
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
-import org.apache.spark.sql.types.{BooleanType, StructType}
+import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
 
 import java.util.Base64
 
 /**
- * The Command for hoodie MergeIntoTable.
- * The match on condition must contain the row key fields currently, so that we can use Hoodie
- * Index to speed up the performance.
+ * Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
  *
- * The main algorithm:
+ * NOTE: That this implementation is restricted in a some aspects to accommodate for Hudi's crucial
+ *       constraint (of requiring every record to bear unique primary-key): merging condition ([[mergeCondition]])
+ *       is currently can only (and must) reference target table's primary-key columns (this is necessary to
+ *       leverage Hudi's upserting capabilities including Indexes)
  *
- * We pushed down all the matched and not matched (condition, assignment) expression pairs to the
- * ExpressionPayload. And the matched (condition, assignment) expression pairs will execute in the
- * ExpressionPayload#combineAndGetUpdateValue to compute the result record, while the not matched
- * expression pairs will execute in the ExpressionPayload#getInsertValue.
+ * Following algorithm is applied:
  *
- * For Mor table, it is a litter complex than this. The matched record also goes through the getInsertValue
- * and write append to the log. So the update actions & insert actions should process by the same
- * way. We pushed all the update actions & insert actions together to the
- * ExpressionPayload#getInsertValue.
+ * <ol>
+ *   <li>Incoming batch ([[sourceTable]]) is reshaped such that it bears correspondingly:
+ *   a) (required) "primary-key" column as well as b) (optional) "pre-combine" column; this is
+ *   required since MIT statements does not restrict [[sourceTable]]s schema to be aligned w/ the
+ *   [[targetTable]]s one, while Hudi's upserting flow expects such columns to be present</li>

Review Comment:
   Is this always true? I think if we enable validate avro schema, which then checks for fields by names and hence MIT with different column names might fail.



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -554,4 +657,16 @@ object MergeIntoHoodieTableCommand {
     }
   }
 
+  def stripCasting(expr: EqualTo): EqualTo = expr match {
+    case EqualTo(MatchCast(leftExpr, leftTargetType, _, _), MatchCast(rightExpr, rightTargetType, _, _))
+      if leftTargetType.sameType(rightTargetType) => EqualTo(leftExpr, rightExpr)
+    case _ => expr
+  }
+
+  def toStructType(attrs: Seq[Attribute]): StructType =
+    StructType(attrs.map(a => StructField(a.qualifiedName.replace('.', '_'), a.dataType, a.nullable, a.metadata)))

Review Comment:
   why do we need to replace perios by underscores?



##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.{DataSourceReadOptions, DefaultSource, SparkAdapterSupport}
+import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
+import org.apache.spark.sql.catalyst.analysis.UnresolvedPartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
+import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery
+import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery.parseOptions
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.connector.catalog.{Table, V1Table}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.analysis.HoodieSpark32PlusAnalysis.{HoodieV1OrV2Table, ResolvesToHudiTable}
+import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
+import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession}
+
+/**
+ * NOTE: PLEASE READ CAREFULLY
+ *
+ * Since Hudi relations don't currently implement DS V2 Read API, we have to fallback to V1 here.
+ * Such fallback will have considerable performance impact, therefore it's only performed in cases
+ * where V2 API have to be used. Currently only such use-case is using of Schema Evolution feature
+ *
+ * Check out HUDI-4178 for more details
+ */
+case class HoodieDataSourceV2ToV1Fallback(sparkSession: SparkSession) extends Rule[LogicalPlan]

Review Comment:
   Can you help me understand why this is only needed for Spark 3.2.x?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala:
##########
@@ -20,35 +20,44 @@ package org.apache.spark.sql.hudi.command
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter}
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
 
-case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends HoodieLeafRunnableCommand
-  with SparkAdapterSupport with ProvidesHoodieConfig {
+case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends HoodieLeafRunnableCommand
+  with SparkAdapterSupport
+  with ProvidesHoodieConfig {
 
-  private val table = deleteTable.table
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalogTable = sparkAdapter.resolveHoodieTable(dft.table)
+      .map(HoodieCatalogTable(sparkSession, _))
+      .get
 
-  private val tableId = getTableIdentifier(table)
+    val tableId = catalogTable.table.qualifiedName
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    logInfo(s"start execute delete command for $tableId")
+    logInfo(s"Executing 'DELETE FROM' command for $tableId")
+
+    val condition = sparkAdapter.extractDeleteCondition(dft)
+
+    val targetLogicalPlan = stripMetaFieldAttributes(dft.table)

Review Comment:
   So what happens if delete query is predicated on one of the meta fields, e.g. `DELETE FROM table where _hoodie_commit_time > 2023-01-01`?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -173,7 +172,21 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
     (avroSchema, internalSchemaOpt)
   }
 
-  protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+  protected lazy val tableStructSchema: StructType = {
+    val converted = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+
+    val resolver = sparkSession.sessionState.analyzer.resolver
+    val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
+
+    // TODO elaborate

Review Comment:
   i think the code below speaks for itself, but please remove TODO and add a comment if you think it needs description.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala:
##########
@@ -50,6 +50,9 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
     dir
   }
 
+  // NOTE: We need to set "spark.testing" property to make sure Spark can appropriately
+  //       recognize environment as testing
+  System.setProperty("spark.testing", "true")

Review Comment:
   Interesting! So is this property there for all spark versions and what are the benefits of setting this?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala:
##########
@@ -48,47 +47,19 @@ trait HoodieCatalystPlansUtils {
    */
   def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan
 
-  /**
-   * Convert a AliasIdentifier to TableIdentifier.
-   */
-  def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier
-
-  /**
-   * Convert a UnresolvedRelation to TableIdentifier.
-   */
-  def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier
-
   /**
    * Create Join logical plan.
    */
   def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join
 
   /**
-   * Test if the logical plan is a Insert Into LogicalPlan.
-   */
-  def isInsertInto(plan: LogicalPlan): Boolean
-
-  /**
-   * Get the member of the Insert Into LogicalPlan.
+   * Decomposes [[InsertIntoStatement]] into its arguments allowing to accommodate for API
+   * changes in Spark 3.3
    */
-  def getInsertIntoChildren(plan: LogicalPlan):
-    Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
+  def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
 
-  /**
-   * if the logical plan is a TimeTravelRelation LogicalPlan.
-   */
-  def isRelationTimeTravel(plan: LogicalPlan): Boolean
-
-  /**
-   * Get the member of the TimeTravelRelation LogicalPlan.
-   */
-  def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])]
-
-  /**
-   * Create a Insert Into LogicalPlan.
-   */
-  def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
-                       query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan
+  // TODO scala-docs

Review Comment:
   add docs or remove the comment?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -539,6 +599,49 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
     combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
       defaultOpts = Map.empty, overridingOpts = overridingOpts)
   }
+
+
+  def validate(mit: MergeIntoTable): Unit = {
+    checkUpdatingActions(updatingActions)
+    checkInsertingActions(insertingActions)
+    checkDeletingActions(deletingActions)
+  }
+
+  private def checkDeletingActions(deletingActions: Seq[DeleteAction]): Unit = {
+    if (deletingActions.length > 1) {
+      throw new AnalysisException(s"Only one deleting action is supported in MERGE INTO statement (provided ${deletingActions.length})")
+    }
+  }
+
+  private def checkInsertingActions(insertActions: Seq[InsertAction]): Unit = {
+    insertActions.foreach(insert =>
+      assert(insert.assignments.length == targetTableSchema.length,
+        s"The number of insert assignments[${insert.assignments.length}] must equal to the " +
+          s"targetTable field size[${targetTableSchema.length}]"))
+
+  }
+
+  private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
+    if (updateActions.length > 1) {
+      throw new AnalysisException(s"Only one updating action is supported in MERGE INTO statement (provided ${updateActions.length})")
+    }
+
+    //updateActions.foreach(update =>
+    //  assert(update.assignments.length == targetTableSchema.length,
+    //    s"The number of update assignments[${update.assignments.length}] must equal to the " +
+    //      s"targetTable field size[${targetTableSchema.length}]"))

Review Comment:
   remove the commented part?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -127,164 +155,189 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
     // target table side (since we're gonna be matching against primary-key column as is) expression
     // on the opposite side of the comparison should be cast-able to the primary-key column's data-type
     // t/h "up-cast" (ie w/o any loss in precision)
-    val target2Source = cleanedConditions.map {
-      case EqualTo(CoercedAttributeReference(attr), expr)
-        if targetAttrs.exists(f => attributeEqual(f, attr, resolver)) =>
-          if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
-            targetAttrs.find(f => resolver(f.name, attr.name)).get.name ->
-              castIfNeeded(expr, attr.dataType, sparkSession.sqlContext.conf)
-          } else {
-            throw new AnalysisException(s"Invalid MERGE INTO matching condition: ${expr.sql}: "
-              + s"can't cast ${expr.sql} (of ${expr.dataType}) to ${attr.dataType}")
-          }
+    val targetAttr2ConditionExpressions = cleanedConditions.map {
+      case EqualTo(CoercedAttributeReference(attr), expr) if targetAttrs.exists(f => attributeEquals(f, attr)) =>
+        if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
+          // NOTE: It's critical we reference output attribute here and not the one from condition
+          val targetAttr = targetAttrs.find(f => attributeEquals(f, attr)).get
+          targetAttr -> castIfNeeded(expr, attr.dataType)
+        } else {
+          throw new AnalysisException(s"Invalid MERGE INTO matching condition: ${expr.sql}: "
+            + s"can't cast ${expr.sql} (of ${expr.dataType}) to ${attr.dataType}")
+        }
 
-      case EqualTo(expr, CoercedAttributeReference(attr))
-        if targetAttrs.exists(f => attributeEqual(f, attr, resolver)) =>
-          if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
-            targetAttrs.find(f => resolver(f.name, attr.name)).get.name ->
-              castIfNeeded(expr, attr.dataType, sparkSession.sqlContext.conf)
-          } else {
-            throw new AnalysisException(s"Invalid MERGE INTO matching condition: ${expr.sql}: "
-              + s"can't cast ${expr.sql} (of ${expr.dataType}) to ${attr.dataType}")
-          }
+      case EqualTo(expr, CoercedAttributeReference(attr)) if targetAttrs.exists(f => attributeEquals(f, attr)) =>
+        if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
+          // NOTE: It's critical we reference output attribute here and not the one from condition
+          val targetAttr = targetAttrs.find(f => attributeEquals(f, attr)).get
+          targetAttr -> castIfNeeded(expr, attr.dataType)
+        } else {
+          throw new AnalysisException(s"Invalid MERGE INTO matching condition: ${expr.sql}: "
+            + s"can't cast ${expr.sql} (of ${expr.dataType}) to ${attr.dataType}")
+        }
 
       case expr =>
         throw new AnalysisException(s"Invalid MERGE INTO matching condition: `${expr.sql}`: "
           + "expected condition should be 'target.id = <source-column-expr>', e.g. "
           + "`t.id = s.id` or `t.id = cast(s.id, ...)`")
-    }.toMap
+    }
 
-    target2Source
+    targetAttr2ConditionExpressions.collect {
+      case (attr, expr) if resolver(attr.name, primaryKeyField) =>
+        // NOTE: Here we validate that condition expression involving primary-key column(s) is a simple
+        //       attribute-reference expression (possibly wrapped into a cast). This is necessary to disallow
+        //       statements like following
+        //
+        //         MERGE INTO ... AS t USING (
+        //            SELECT ... FROM ... AS s
+        //         )
+        //            ON t.id = s.id + 1
+        //            WHEN MATCHED THEN UPDATE *
+        //
+        //       Which (in the current design) could result in a primary key of the record being modified,
+        //       which is not allowed.
+        if (!resolvesToSourceAttribute(expr)) {
+          throw new AnalysisException("Only simple conditions of the form `t.id = s.id` are allowed on the " +
+            s"primary-key column. Found `${attr.sql} = ${expr.sql}`")
+        }
+
+        (attr, expr)
+    }
   }
 
   /**
-   * Get the mapping of target preCombineField to the source expression.
+   * Please check description for [[primaryKeyAttributeToConditionExpression]]
    */
-  private lazy val target2SourcePreCombineFiled: Option[(String, Expression)] = {
-    val updateActions = mergeInto.matchedActions.collect { case u: UpdateAction => u }
-    assert(updateActions.size <= 1, s"Only support one updateAction currently, current update action count is: ${updateActions.size}")
-
-    val updateAction = updateActions.headOption
-    hoodieCatalogTable.preCombineKey.map(preCombineField => {
-      val sourcePreCombineField =
-        updateAction.map(u => u.assignments.filter {
-            case Assignment(key: AttributeReference, _) => key.name.equalsIgnoreCase(preCombineField)
-            case _=> false
-          }.head.value
-        ).getOrElse {
-          // If there is no update action, mapping the target column to the source by order.
-          val target2Source = mergeInto.targetTable.output
-            .filter(attr => !isMetaField(attr.name))
-            .map(_.name)
-            .zip(mergeInto.sourceTable.output.filter(attr => !isMetaField(attr.name)))
-            .toMap
-          target2Source.getOrElse(preCombineField, null)
+  private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute, Expression)] = {
+    val resolver = sparkSession.sessionState.analyzer.resolver
+    hoodieCatalogTable.preCombineKey.map { preCombineField =>
+      val targetPreCombineAttribute =
+        mergeInto.targetTable.output
+          .find { attr => resolver(attr.name, preCombineField) }
+          .get
+
+      // To find corresponding "pre-combine" attribute w/in the [[sourceTable]] we do
+      //    - Check if we can resolve the attribute w/in the source table as is; if unsuccessful, then
+      //    - Check if in any of the update actions, right-hand side of the assignment actually resolves
+      //    to it, in which case we will determine left-hand side expression as the value of "pre-combine"
+      //    attribute w/in the [[sourceTable]]
+      val sourceExpr = {
+        mergeInto.sourceTable.output.find(attr => resolver(attr.name, preCombineField)) match {
+          case Some(attr) => attr
+          case None =>
+            updatingActions.flatMap(_.assignments).collectFirst {
+              case Assignment(attr: AttributeReference, expr)
+                if resolver(attr.name, preCombineField) && resolvesToSourceAttribute(expr) => expr
+            } getOrElse {
+              throw new AnalysisException(s"Failed to resolve pre-combine field `${preCombineField}` w/in the source-table output")
+            }
+
         }
-      (preCombineField, sourcePreCombineField)
-    }).filter(p => p._2 != null)
+      }
+
+      (targetPreCombineAttribute, sourceExpr)
+    }
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     this.sparkSession = sparkSession
+    // TODO move to analysis phase
+    validate(mergeInto)

Review Comment:
   This is a good point. Do you intend to move it in this PR itself?



##########
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, Expression, ExtractValue, GetStructField, LambdaFunction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, Window}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.toPrettySQL
+
+/**
+ * NOTE: This code is borrowed from Spark 3.1.3
+ *       This code is borrowed, so that we can have some advanced Spark SQL functionality (like Merge Into, for ex)
+ *       in Spark 2.x
+ *
+ *       PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY NECESSARY
+ */
+object HoodieSpark2Analysis {

Review Comment:
   I thought MIT is supported in Spark 2.x. We have unit tests for MIT that run for Spark 2.x as well, isn't it? If it's not supported, should we make it clear in the quickstart guide as well? And should we add this support in a separate PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope merged pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope merged PR #7871:
URL: https://github.com/apache/hudi/pull/7871


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1097916787


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -28,97 +28,125 @@ import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TB
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.hudi.util.JFunction.scalaFunction1Noop
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport}
-import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, attributeEquals}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, EqualTo, Expression, Literal, NamedExpression, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
-import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
-import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
+import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference, encodeAsBase64String, stripCasting, toStructType}
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
-import org.apache.spark.sql.types.{BooleanType, StructType}
+import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
 
 import java.util.Base64
 
 /**
- * The Command for hoodie MergeIntoTable.
- * The match on condition must contain the row key fields currently, so that we can use Hoodie
- * Index to speed up the performance.
+ * Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
  *
- * The main algorithm:
+ * NOTE: That this implementation is restricted in a some aspects to accommodate for Hudi's crucial
+ *       constraint (of requiring every record to bear unique primary-key): merging condition ([[mergeCondition]])
+ *       is currently can only (and must) reference target table's primary-key columns (this is necessary to
+ *       leverage Hudi's upserting capabilities including Indexes)
  *
- * We pushed down all the matched and not matched (condition, assignment) expression pairs to the
- * ExpressionPayload. And the matched (condition, assignment) expression pairs will execute in the
- * ExpressionPayload#combineAndGetUpdateValue to compute the result record, while the not matched
- * expression pairs will execute in the ExpressionPayload#getInsertValue.
+ * Following algorithm is applied:
  *
- * For Mor table, it is a litter complex than this. The matched record also goes through the getInsertValue
- * and write append to the log. So the update actions & insert actions should process by the same
- * way. We pushed all the update actions & insert actions together to the
- * ExpressionPayload#getInsertValue.
+ * <ol>
+ *   <li>Incoming batch ([[sourceTable]]) is reshaped such that it bears correspondingly:
+ *   a) (required) "primary-key" column as well as b) (optional) "pre-combine" column; this is
+ *   required since MIT statements does not restrict [[sourceTable]]s schema to be aligned w/ the
+ *   [[targetTable]]s one, while Hudi's upserting flow expects such columns to be present</li>
  *
+ *   <li>After reshaping we're writing [[sourceTable]] as a normal batch using Hudi's upserting
+ *   sequence, where special [[ExpressionPayload]] implementation of the [[HoodieRecordPayload]]
+ *   is used allowing us to execute updating, deleting and inserting clauses like following:</li>
+ *
+ *     <ol>
+ *       <li>All the matched {@code WHEN MATCHED AND ... THEN (DELETE|UPDATE ...)} conditional clauses
+ *       will produce [[(condition, expression)]] tuples that will be executed w/in the
+ *       [[ExpressionPayload#combineAndGetUpdateValue]] against existing (from [[targetTable]]) and
+ *       incoming (from [[sourceTable]]) records producing the updated one;</li>
+ *
+ *       <li>Not matched {@code WHEN NOT MATCHED AND ... THEN INSERT ...} conditional clauses
+ *       will produce [[(condition, expression)]] tuples that will be executed w/in [[ExpressionPayload#getInsertValue]]
+ *       against incoming records producing ones to be inserted into target table;</li>
+ *     </ol>
+ * </ol>
+ *
+ * TODO explain workflow for MOR tables
  */
 case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends HoodieLeafRunnableCommand

Review Comment:
   Changes to this command were required due to 
   
    - Switching from bespoke resolution to Spark's standard one: we need to abide by Spark's semantic and had to get rid of some customizations implemented previously
    - Cleaned up the code considerably simplifying the implementation by getting rid of custom utilities and replacing them w/ Spark's standard ones (for ex, to resolve, bind expressions, etc)
    - Adding documentation to elaborate on the overall workflow



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -128,15 +150,151 @@ object HoodieAnalysis {
     //       To work this around, we injecting this as the rule that trails pre-CBO, ie it's
     //          - Triggered before CBO, therefore have access to the same stats as CBO
     //          - Precedes actual [[customEarlyScanPushDownRules]] invocation
-    optimizerRules += (spark => HoodiePruneFileSourcePartitions(spark))
+    rules += (spark => HoodiePruneFileSourcePartitions(spark))
+
+    rules
+  }
+
+  /**
+   * This rule adjusts output of the [[LogicalRelation]] resolving int Hudi tables such that all of the
+   * default Spark resolution could be applied resolving standard Spark SQL commands
+   *
+   * <ul>
+   *  <li>`MERGE INTO ...`</li>
+   *  <li>`INSERT INTO ...`</li>
+   *  <li>`UPDATE ...`</li>
+   * </ul>
+   *
+   * even though Hudi tables might be carrying meta-fields that have to be ignored during resolution phase.
+   *
+   * Spark >= 3.2 bears fully-fledged support for meta-fields and such antics are not required for it:
+   * we just need to annotate corresponding attributes as "metadata" for Spark to be able to ignore it.
+   *
+   * In Spark < 3.2 however, this is worked around by simply removing any meta-fields from the output
+   * of the [[LogicalRelation]] resolving into Hudi table. Note that, it's a safe operation since we
+   * actually need to ignore these values anyway
+   */
+  case class AdaptIngestionTargetLogicalRelations(spark: SparkSession) extends Rule[LogicalPlan] {

Review Comment:
   This is the core of the change:
   
    - Here we remove implementation of the bespoke resolution rules for Hudi components, instead relying on Spark to resolve these (most of these constructs don't have any custom logic relative to vanilla Spark SQL and therefore would be perfectly fine resolved by standard Spark resolution rules) 
    - Instead we repurpose these rules to serve as conversion point from Spark's standard constructs (like `MergeIntoTable`) into Hudi's ones that implement Hudi-specific semantic (`MergeIntoHoodieTableCommand`). Note that, we require these constructs be fully resolved prior to us trying to convert them



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -276,337 +457,11 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
   }
 }
 
-/**
- * Rule for resolve hoodie's extended syntax or rewrite some logical plan.
- *
- * @param sparkSession
- */
-case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[LogicalPlan]

Review Comment:
   This is an example of the custom rule that is removed completely



##########
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, Expression, ExtractValue, GetStructField, LambdaFunction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, Window}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.toPrettySQL
+
+/**
+ * NOTE: This code is borrowed from Spark 3.1.3
+ *       This code is borrowed, so that we can have some advanced Spark SQL functionality (like Merge Into, for ex)
+ *       in Spark 2.x
+ *
+ *       PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY NECESSARY
+ */
+object HoodieSpark2Analysis {
+
+  case class ResolveReferences(spark: SparkSession) extends Rule[LogicalPlan] {
+
+    private val resolver = spark.sessionState.conf.resolver
+
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)

Review Comment:
   These rules were borrowed from Spark 3.1.3 to bring support for `Merge Into` statement to Spark 2.x, which doesn't have it out of the box



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1105004523


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -554,4 +657,16 @@ object MergeIntoHoodieTableCommand {
     }
   }
 
+  def stripCasting(expr: EqualTo): EqualTo = expr match {
+    case EqualTo(MatchCast(leftExpr, leftTargetType, _, _), MatchCast(rightExpr, rightTargetType, _, _))
+      if leftTargetType.sameType(rightTargetType) => EqualTo(leftExpr, rightExpr)
+    case _ => expr
+  }
+
+  def toStructType(attrs: Seq[Attribute]): StructType =
+    StructType(attrs.map(a => StructField(a.qualifiedName.replace('.', '_'), a.dataType, a.nullable, a.metadata)))

Review Comment:
   Here we combine the output of joining operation back into schema so that we can assert that the records we receive in `ExpressionPayload` still adhere to the expected schema. Here we need to replace dots, b/c 
   
    - Names could be qualified (in case of nested structs; note that this schema is essentially a flattened one)
    - Field names can't have dots



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7871:
URL: https://github.com/apache/hudi/pull/7871#issuecomment-1419832205

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "be8c865ae5586420a0b171667f390110808e49dc",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969",
       "triggerID" : "be8c865ae5586420a0b171667f390110808e49dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * be8c865ae5586420a0b171667f390110808e49dc Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1111428840


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala:
##########
@@ -20,35 +20,44 @@ package org.apache.spark.sql.hudi.command
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter}
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
 
-case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends HoodieLeafRunnableCommand
-  with SparkAdapterSupport with ProvidesHoodieConfig {
+case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends HoodieLeafRunnableCommand
+  with SparkAdapterSupport
+  with ProvidesHoodieConfig {
 
-  private val table = deleteTable.table
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalogTable = sparkAdapter.resolveHoodieTable(dft.table)
+      .map(HoodieCatalogTable(sparkSession, _))
+      .get
 
-  private val tableId = getTableIdentifier(table)
+    val tableId = catalogTable.table.qualifiedName
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    logInfo(s"start execute delete command for $tableId")
+    logInfo(s"Executing 'DELETE FROM' command for $tableId")
+
+    val condition = sparkAdapter.extractDeleteCondition(dft)
+
+    val targetLogicalPlan = stripMetaFieldAttributes(dft.table)

Review Comment:
   Ok, so you're saying we are essentially retaining the same behavior. I'm not sure why we remove meta fields. We should allow such delete conditions. It's not the most common use case but still worth it when users want to remove all records since a processing time. Wdyt?
   cc @xiarixiaoyao @XuQianJin-Stars 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7871:
URL: https://github.com/apache/hudi/pull/7871#issuecomment-1419841817

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "be8c865ae5586420a0b171667f390110808e49dc",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969",
       "triggerID" : "be8c865ae5586420a0b171667f390110808e49dc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * be8c865ae5586420a0b171667f390110808e49dc Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969) 
   * 286533b5f8edf1d85a49fcf6449fb11d4f4085d6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7871:
URL: https://github.com/apache/hudi/pull/7871#issuecomment-1441296660

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "be8c865ae5586420a0b171667f390110808e49dc",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969",
       "triggerID" : "be8c865ae5586420a0b171667f390110808e49dc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14970",
       "triggerID" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9763c0f781d6166c16f9752b9c7447429bd5cf84",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9763c0f781d6166c16f9752b9c7447429bd5cf84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 286533b5f8edf1d85a49fcf6449fb11d4f4085d6 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14970) 
   * 9763c0f781d6166c16f9752b9c7447429bd5cf84 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1104985764


##########
hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java:
##########
@@ -69,6 +69,26 @@ public static boolean nonEmpty(Collection<?> c) {
     return !isNullOrEmpty(c);
   }
 
+  /**
+   * Reduces provided {@link Collection} using provided {@code reducer} applied to
+   * every element of the collection like following
+   *
+   * {@code reduce(reduce(reduce(identity, e1), e2), ...)}
+   *
+   * @param c target collection to be reduced
+   * @param identity element for reducing to start from
+   * @param reducer actual reducing operator
+   *
+   * @return result of the reduction of the collection using reducing operator
+   */
+  public static <T, U> U reduce(Collection<T> c, U identity, BiFunction<U, T, U> reducer) {
+    return c.stream()
+        .sequential()

Review Comment:
   [Reducing](https://en.wikipedia.org/wiki/Reduction_operator) is inherently sequential operation. In this case i'm just creating a convenience wrapper enforcing this in exchange of simplifying the API provided by the streams (if someone wants to do it in parallel they can use Streams API directly)



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChange.java:
##########
@@ -83,10 +83,16 @@ abstract class BaseColumnChange implements TableChange {
     protected final InternalSchema internalSchema;
     protected final Map<Integer, Integer> id2parent;
     protected final Map<Integer, ArrayList<ColumnPositionChange>> positionChangeMap = new HashMap<>();
+    protected final boolean caseSensitive;
 
     BaseColumnChange(InternalSchema schema) {
+      this(schema, false);

Review Comment:
   To keep things compatible w/ how they are today



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -75,7 +81,7 @@ trait HoodieCatalystExpressionUtils {
   def unapplyCastExpression(expr: Expression): Option[(Expression, DataType, Option[String], Boolean)]
 }
 
-object HoodieCatalystExpressionUtils {
+object HoodieCatalystExpressionUtils extends SparkAdapterSupport {

Review Comment:
   Yes, adapter is needed to match Cast expression (`MatchCast` object below)



##########
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, Expression, ExtractValue, GetStructField, LambdaFunction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, Window}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.toPrettySQL
+
+/**
+ * NOTE: This code is borrowed from Spark 3.1.3
+ *       This code is borrowed, so that we can have some advanced Spark SQL functionality (like Merge Into, for ex)
+ *       in Spark 2.x
+ *
+ *       PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY NECESSARY
+ */
+object HoodieSpark2Analysis {

Review Comment:
   It's supported. 
   
   The difference is that previously Hudi's bespoke MIT resolution logic was applied to all Spark versions not only Spark 2.x. Now,
    - For Spark 3.x we rely on Spark's own logic to resolve
    - For Spark 2.x we back-ported code from Spark 3.1.x



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala:
##########
@@ -20,35 +20,44 @@ package org.apache.spark.sql.hudi.command
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter}
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
 
-case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends HoodieLeafRunnableCommand
-  with SparkAdapterSupport with ProvidesHoodieConfig {
+case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends HoodieLeafRunnableCommand
+  with SparkAdapterSupport
+  with ProvidesHoodieConfig {
 
-  private val table = deleteTable.table
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalogTable = sparkAdapter.resolveHoodieTable(dft.table)
+      .map(HoodieCatalogTable(sparkSession, _))
+      .get
 
-  private val tableId = getTableIdentifier(table)
+    val tableId = catalogTable.table.qualifiedName
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    logInfo(s"start execute delete command for $tableId")
+    logInfo(s"Executing 'DELETE FROM' command for $tableId")
+
+    val condition = sparkAdapter.extractDeleteCondition(dft)
+
+    val targetLogicalPlan = stripMetaFieldAttributes(dft.table)

Review Comment:
   Good call. I don't think this will work in the exsting impl 



##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.{DataSourceReadOptions, DefaultSource, SparkAdapterSupport}
+import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
+import org.apache.spark.sql.catalyst.analysis.UnresolvedPartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
+import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery
+import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery.parseOptions
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.connector.catalog.{Table, V1Table}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.analysis.HoodieSpark32PlusAnalysis.{HoodieV1OrV2Table, ResolvesToHudiTable}
+import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
+import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession}
+
+/**
+ * NOTE: PLEASE READ CAREFULLY
+ *
+ * Since Hudi relations don't currently implement DS V2 Read API, we have to fallback to V1 here.
+ * Such fallback will have considerable performance impact, therefore it's only performed in cases
+ * where V2 API have to be used. Currently only such use-case is using of Schema Evolution feature
+ *
+ * Check out HUDI-4178 for more details
+ */
+case class HoodieDataSourceV2ToV1Fallback(sparkSession: SparkSession) extends Rule[LogicalPlan]

Review Comment:
   B/c we're using DSVv2 only in `HoodieCatalog` that's Spark >= 3.2



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala:
##########
@@ -50,6 +50,9 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
     dir
   }
 
+  // NOTE: We need to set "spark.testing" property to make sure Spark can appropriately
+  //       recognize environment as testing
+  System.setProperty("spark.testing", "true")

Review Comment:
   We can configure some of the configs that aren't configurable in prod env



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1097924911


##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java:
##########
@@ -44,12 +43,12 @@ public class TableChanges {
   public static class ColumnUpdateChange extends TableChange.BaseColumnChange {
     private final Map<Integer, Types.Field> updates = new HashMap<>();
 
-    public static ColumnUpdateChange get(InternalSchema schema) {
-      return new ColumnUpdateChange(schema);
+    private ColumnUpdateChange(InternalSchema schema) {

Review Comment:
   These changes are necessary to properly handle various case-sensitivity settings



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7871:
URL: https://github.com/apache/hudi/pull/7871#issuecomment-1419767182

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "be8c865ae5586420a0b171667f390110808e49dc",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969",
       "triggerID" : "be8c865ae5586420a0b171667f390110808e49dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * be8c865ae5586420a0b171667f390110808e49dc Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7871:
URL: https://github.com/apache/hudi/pull/7871#issuecomment-1419759493

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "be8c865ae5586420a0b171667f390110808e49dc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "be8c865ae5586420a0b171667f390110808e49dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * be8c865ae5586420a0b171667f390110808e49dc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7871: [HUDI-4690][HUDI-4503] Cleaning up Hudi custom Spark `Rule`s

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7871:
URL: https://github.com/apache/hudi/pull/7871#issuecomment-1420018284

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "be8c865ae5586420a0b171667f390110808e49dc",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14969",
       "triggerID" : "be8c865ae5586420a0b171667f390110808e49dc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14970",
       "triggerID" : "286533b5f8edf1d85a49fcf6449fb11d4f4085d6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 286533b5f8edf1d85a49fcf6449fb11d4f4085d6 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14970) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org