You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/12 22:26:13 UTC

[GitHub] [hudi] CTTY commented on a diff in pull request #5943: [HUDI-4186] [WIP] Support Hudi with Spark 3.3.0

CTTY commented on code in PR #5943:
URL: https://github.com/apache/hudi/pull/5943#discussion_r919474750


##########
hudi-spark-datasource/hudi-spark3.3/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala:
##########
@@ -0,0 +1,3351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.parser
+
+import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
+import org.antlr.v4.runtime.{ParserRuleContext, Token}
+import org.apache.hudi.spark.sql.parser.HoodieSqlBaseParser._
+import org.apache.hudi.spark.sql.parser.{HoodieSqlBaseBaseVisitor, HoodieSqlBaseParser}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{EnhancedLogicalPlan, checkDuplicateClauses, checkDuplicateKeys, entry, escapedIdentifier, operationNotAllowed, source, string, stringWithoutUnescape, validate, withOrigin}
+import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils, truncatedString}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.BucketSpecHelper
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
+import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform, Expression => V2Expression}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.util.Utils.isTesting
+import org.apache.spark.util.random.RandomSampler
+
+import java.util.Locale
+import java.util.concurrent.TimeUnit
+import javax.xml.bind.DatatypeConverter
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * The AstBuilder for HoodieSqlParser to parser the AST tree to Logical Plan.
+ * Here we only do the parser for the extended sql syntax. e.g MergeInto. For
+ * other sql syntax we use the delegate sql parser which is the SparkSqlParser.
+ */
+class HoodieSpark3_3ExtendedSqlAstBuilder(conf: SQLConf, delegate: ParserInterface)
+  extends HoodieSqlBaseBaseVisitor[AnyRef] with Logging {
+
+  protected def typedVisit[T](ctx: ParseTree): T = {
+    ctx.accept(this).asInstanceOf[T]
+  }
+
+  /**
+   * Override the default behavior for all visit methods. This will only return a non-null result
+   * when the context has only one child. This is done because there is no generic method to
+   * combine the results of the context children. In all other cases null is returned.
+   */
+  override def visitChildren(node: RuleNode): AnyRef = {
+    if (node.getChildCount == 1) {
+      node.getChild(0).accept(this)
+    } else {
+      null
+    }
+  }
+
+  /**
+   * Create an aliased table reference. This is typically used in FROM clauses.
+   */
+  override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
+    val tableId = visitMultipartIdentifier(ctx.multipartIdentifier())
+    val relation = UnresolvedRelation(tableId)
+    val table = mayApplyAliasPlan(
+      ctx.tableAlias, relation.optionalMap(ctx.temporalClause)(withTimeTravel))
+    table.optionalMap(ctx.sample)(withSample)
+  }
+
+  private def withTimeTravel(
+                              ctx: TemporalClauseContext, plan: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    val v = ctx.version
+    val version = if (ctx.INTEGER_VALUE != null) {
+      Some(v.getText)
+    } else {
+      Option(v).map(string)
+    }
+
+    val timestamp = Option(ctx.timestamp).map(expression)
+    if (timestamp.exists(_.references.nonEmpty)) {
+      throw new ParseException(
+        "timestamp expression cannot refer to any columns", ctx.timestamp)
+    }
+    if (timestamp.exists(e => SubqueryExpression.hasSubquery(e))) {
+      throw new ParseException(
+        "timestamp expression cannot contain subqueries", ctx.timestamp)
+    }
+
+    TimeTravelRelation(plan, timestamp, version)
+  }
+
+  // ============== The following code is fork from org.apache.spark.sql.catalyst.parser.AstBuilder
+  override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {
+    visit(ctx.statement).asInstanceOf[LogicalPlan]
+  }
+
+  override def visitSingleExpression(ctx: SingleExpressionContext): Expression = withOrigin(ctx) {
+    visitNamedExpression(ctx.namedExpression)
+  }
+
+  override def visitSingleTableIdentifier(
+                                           ctx: SingleTableIdentifierContext): TableIdentifier = withOrigin(ctx) {
+    visitTableIdentifier(ctx.tableIdentifier)
+  }
+
+  override def visitSingleFunctionIdentifier(
+                                              ctx: SingleFunctionIdentifierContext): FunctionIdentifier = withOrigin(ctx) {
+    visitFunctionIdentifier(ctx.functionIdentifier)
+  }
+
+  override def visitSingleMultipartIdentifier(
+                                               ctx: SingleMultipartIdentifierContext): Seq[String] = withOrigin(ctx) {
+    visitMultipartIdentifier(ctx.multipartIdentifier)
+  }
+
+  override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = withOrigin(ctx) {
+    typedVisit[DataType](ctx.dataType)
+  }
+
+  override def visitSingleTableSchema(ctx: SingleTableSchemaContext): StructType = {
+    val schema = StructType(visitColTypeList(ctx.colTypeList))
+    withOrigin(ctx)(schema)
+  }
+
+  /* ********************************************************************************************
+   * Plan parsing
+   * ******************************************************************************************** */
+  protected def plan(tree: ParserRuleContext): LogicalPlan = typedVisit(tree)
+
+  /**
+   * Create a top-level plan with Common Table Expressions.
+   */
+  override def visitQuery(ctx: QueryContext): LogicalPlan = withOrigin(ctx) {
+    val query = plan(ctx.queryTerm).optionalMap(ctx.queryOrganization)(withQueryResultClauses)
+
+    // Apply CTEs
+    query.optionalMap(ctx.ctes)(withCTE)
+  }
+
+  override def visitDmlStatement(ctx: DmlStatementContext): AnyRef = withOrigin(ctx) {
+    val dmlStmt = plan(ctx.dmlStatementNoWith)
+    // Apply CTEs
+    dmlStmt.optionalMap(ctx.ctes)(withCTE)
+  }
+
+  private def withCTE(ctx: CtesContext, plan: LogicalPlan): LogicalPlan = {
+    val ctes = ctx.namedQuery.asScala.map { nCtx =>
+      val namedQuery = visitNamedQuery(nCtx)
+      (namedQuery.alias, namedQuery)
+    }
+    // Check for duplicate names.
+    val duplicates = ctes.groupBy(_._1).filter(_._2.size > 1).keys
+    if (duplicates.nonEmpty) {
+      throw new ParseException(s"CTE definition can't have duplicate names: ${duplicates.mkString("'", "', '", "'")}.", ctx)
+    }
+    UnresolvedWith(plan, ctes.toSeq)
+  }
+
+  /**
+   * Create a logical query plan for a hive-style FROM statement body.
+   */
+  private def withFromStatementBody(
+                                     ctx: FromStatementBodyContext, plan: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    // two cases for transforms and selects
+    if (ctx.transformClause != null) {
+      withTransformQuerySpecification(
+        ctx,
+        ctx.transformClause,
+        ctx.lateralView,
+        ctx.whereClause,
+        ctx.aggregationClause,
+        ctx.havingClause,
+        ctx.windowClause,
+        plan
+      )
+    } else {
+      withSelectQuerySpecification(
+        ctx,
+        ctx.selectClause,
+        ctx.lateralView,
+        ctx.whereClause,
+        ctx.aggregationClause,
+        ctx.havingClause,
+        ctx.windowClause,
+        plan
+      )
+    }
+  }
+
+  override def visitFromStatement(ctx: FromStatementContext): LogicalPlan = withOrigin(ctx) {
+    val from = visitFromClause(ctx.fromClause)
+    val selects = ctx.fromStatementBody.asScala.map { body =>
+      withFromStatementBody(body, from).
+        // Add organization statements.
+        optionalMap(body.queryOrganization)(withQueryResultClauses)
+    }
+    // If there are multiple SELECT just UNION them together into one query.
+    if (selects.length == 1) {
+      selects.head
+    } else {
+      Union(selects.toSeq)
+    }
+  }
+
+  /**
+   * Create a named logical plan.
+   *
+   * This is only used for Common Table Expressions.
+   */
+  override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = withOrigin(ctx) {
+    val subQuery: LogicalPlan = plan(ctx.query).optionalMap(ctx.columnAliases)(
+      (columnAliases, plan) =>
+        UnresolvedSubqueryColumnAliases(visitIdentifierList(columnAliases), plan)
+    )
+    SubqueryAlias(ctx.name.getText, subQuery)
+  }
+
+  /**
+   * Create a logical plan which allows for multiple inserts using one 'from' statement. These
+   * queries have the following SQL form:
+   * {{{
+   *   [WITH cte...]?
+   *   FROM src
+   *   [INSERT INTO tbl1 SELECT *]+
+   * }}}
+   * For example:
+   * {{{
+   *   FROM db.tbl1 A
+   *   INSERT INTO dbo.tbl1 SELECT * WHERE A.value = 10 LIMIT 5
+   *   INSERT INTO dbo.tbl2 SELECT * WHERE A.value = 12
+   * }}}
+   * This (Hive) feature cannot be combined with set-operators.
+   */
+  override def visitMultiInsertQuery(ctx: MultiInsertQueryContext): LogicalPlan = withOrigin(ctx) {
+    val from = visitFromClause(ctx.fromClause)
+
+    // Build the insert clauses.
+    val inserts = ctx.multiInsertQueryBody.asScala.map { body =>
+      withInsertInto(body.insertInto,
+        withFromStatementBody(body.fromStatementBody, from).
+          optionalMap(body.fromStatementBody.queryOrganization)(withQueryResultClauses))
+    }
+
+    // If there are multiple INSERTS just UNION them together into one query.
+    if (inserts.length == 1) {
+      inserts.head
+    } else {
+      Union(inserts.toSeq)
+    }
+  }
+
+  /**
+   * Create a logical plan for a regular (single-insert) query.
+   */
+  override def visitSingleInsertQuery(
+                                       ctx: SingleInsertQueryContext): LogicalPlan = withOrigin(ctx) {
+    withInsertInto(
+      ctx.insertInto(),
+      plan(ctx.queryTerm).optionalMap(ctx.queryOrganization)(withQueryResultClauses))
+  }
+
+  /**
+   * Parameters used for writing query to a table:
+   * (UnresolvedRelation, tableColumnList, partitionKeys, ifPartitionNotExists).
+   */
+  type InsertTableParams = (UnresolvedRelation, Seq[String], Map[String, Option[String]], Boolean)
+
+  /**
+   * Parameters used for writing query to a directory: (isLocal, CatalogStorageFormat, provider).
+   */
+  type InsertDirParams = (Boolean, CatalogStorageFormat, Option[String])
+
+  /**
+   * Add an
+   * {{{
+   *   INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? [identifierList]
+   *   INSERT INTO [TABLE] tableIdentifier [partitionSpec]  [identifierList]
+   *   INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
+   *   INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList]
+   * }}}
+   * operation to logical plan
+   */
+  private def withInsertInto(
+                              ctx: InsertIntoContext,
+                              query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    ctx match {
+      case table: InsertIntoTableContext =>
+        val (relation, cols, partition, ifPartitionNotExists) = visitInsertIntoTable(table)
+        InsertIntoStatement(
+          relation,
+          partition,
+          cols,
+          query,
+          overwrite = false,
+          ifPartitionNotExists)
+      case table: InsertOverwriteTableContext =>
+        val (relation, cols, partition, ifPartitionNotExists) = visitInsertOverwriteTable(table)
+        InsertIntoStatement(
+          relation,
+          partition,
+          cols,
+          query,
+          overwrite = true,
+          ifPartitionNotExists)
+      case dir: InsertOverwriteDirContext =>
+        val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)
+        InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
+      case hiveDir: InsertOverwriteHiveDirContext =>
+        val (isLocal, storage, provider) = visitInsertOverwriteHiveDir(hiveDir)
+        InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
+      case _ =>
+        throw new ParseException("Invalid InsertIntoContext", ctx)
+    }
+  }
+
+  /**
+   * Add an INSERT INTO TABLE operation to the logical plan.
+   */
+  override def visitInsertIntoTable(
+                                     ctx: InsertIntoTableContext): InsertTableParams = withOrigin(ctx) {
+    val cols = Option(ctx.identifierList()).map(visitIdentifierList).getOrElse(Nil)
+    val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
+
+    if (ctx.EXISTS != null) {
+      operationNotAllowed("INSERT INTO ... IF NOT EXISTS", ctx)
+    }
+
+    (createUnresolvedRelation(ctx.multipartIdentifier), cols, partitionKeys, false)
+  }
+
+  /**
+   * Add an INSERT OVERWRITE TABLE operation to the logical plan.
+   */
+  override def visitInsertOverwriteTable(
+                                          ctx: InsertOverwriteTableContext): InsertTableParams = withOrigin(ctx) {
+    assert(ctx.OVERWRITE() != null)
+    val cols = Option(ctx.identifierList()).map(visitIdentifierList).getOrElse(Nil)
+    val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
+
+    val dynamicPartitionKeys: Map[String, Option[String]] = partitionKeys.filter(_._2.isEmpty)
+    if (ctx.EXISTS != null && dynamicPartitionKeys.nonEmpty) {
+      operationNotAllowed("IF NOT EXISTS with dynamic partitions: " +
+        dynamicPartitionKeys.keys.mkString(", "), ctx)
+    }
+
+    (createUnresolvedRelation(ctx.multipartIdentifier), cols, partitionKeys, ctx.EXISTS() != null)
+  }
+
+  /**
+   * Write to a directory, returning a [[InsertIntoDir]] logical plan.
+   */
+  override def visitInsertOverwriteDir(
+                                        ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) {
+    throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
+  }
+
+  /**
+   * Write to a directory, returning a [[InsertIntoDir]] logical plan.
+   */
+  override def visitInsertOverwriteHiveDir(
+                                            ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) {
+    throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
+  }
+
+  private def getTableAliasWithoutColumnAlias(
+                                               ctx: TableAliasContext, op: String): Option[String] = {
+    if (ctx == null) {
+      None
+    } else {
+      val ident = ctx.strictIdentifier()
+      if (ctx.identifierList() != null) {
+        throw new ParseException(s"Columns aliases are not allowed in $op.", ctx.identifierList())
+      }
+      if (ident != null) Some(ident.getText) else None
+    }
+  }
+
+  override def visitDeleteFromTable(
+                                     ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) {
+    val table = createUnresolvedRelation(ctx.multipartIdentifier())
+    val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), "DELETE")
+    val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
+    val predicate = if (ctx.whereClause() != null) {
+      Some(expression(ctx.whereClause().booleanExpression()))
+    } else {
+      None
+    }
+    DeleteFromTable(aliasedTable, predicate.get)
+  }
+
+  override def visitUpdateTable(ctx: UpdateTableContext): LogicalPlan = withOrigin(ctx) {
+    val table = createUnresolvedRelation(ctx.multipartIdentifier())
+    val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), "UPDATE")
+    val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
+    val assignments = withAssignments(ctx.setClause().assignmentList())
+    val predicate = if (ctx.whereClause() != null) {
+      Some(expression(ctx.whereClause().booleanExpression()))
+    } else {
+      None
+    }
+
+    UpdateTable(aliasedTable, assignments, predicate)
+  }
+
+  private def withAssignments(assignCtx: AssignmentListContext): Seq[Assignment] =
+    withOrigin(assignCtx) {
+      assignCtx.assignment().asScala.map { assign =>
+        Assignment(UnresolvedAttribute(visitMultipartIdentifier(assign.key)),
+          expression(assign.value))
+      }.toSeq
+    }
+
+  override def visitMergeIntoTable(ctx: MergeIntoTableContext): LogicalPlan = withOrigin(ctx) {
+    val targetTable = createUnresolvedRelation(ctx.target)
+    val targetTableAlias = getTableAliasWithoutColumnAlias(ctx.targetAlias, "MERGE")
+    val aliasedTarget = targetTableAlias.map(SubqueryAlias(_, targetTable)).getOrElse(targetTable)
+
+    val sourceTableOrQuery = if (ctx.source != null) {
+      createUnresolvedRelation(ctx.source)
+    } else if (ctx.sourceQuery != null) {
+      visitQuery(ctx.sourceQuery)
+    } else {
+      throw new ParseException("Empty source for merge: you should specify a source" +
+        " table/subquery in merge.", ctx.source)
+    }
+    val sourceTableAlias = getTableAliasWithoutColumnAlias(ctx.sourceAlias, "MERGE")
+    val aliasedSource =
+      sourceTableAlias.map(SubqueryAlias(_, sourceTableOrQuery)).getOrElse(sourceTableOrQuery)
+
+    val mergeCondition = expression(ctx.mergeCondition)
+
+    val matchedActions = ctx.matchedClause().asScala.map {
+      clause => {
+        if (clause.matchedAction().DELETE() != null) {
+          DeleteAction(Option(clause.matchedCond).map(expression))
+        } else if (clause.matchedAction().UPDATE() != null) {
+          val condition = Option(clause.matchedCond).map(expression)
+          if (clause.matchedAction().ASTERISK() != null) {
+            UpdateStarAction(condition)
+          } else {
+            UpdateAction(condition, withAssignments(clause.matchedAction().assignmentList()))
+          }
+        } else {
+          // It should not be here.
+          throw new ParseException(s"Unrecognized matched action: ${clause.matchedAction().getText}",
+            clause.matchedAction())
+        }
+      }
+    }
+    val notMatchedActions = ctx.notMatchedClause().asScala.map {
+      clause => {
+        if (clause.notMatchedAction().INSERT() != null) {
+          val condition = Option(clause.notMatchedCond).map(expression)
+          if (clause.notMatchedAction().ASTERISK() != null) {
+            InsertStarAction(condition)
+          } else {
+            val columns = clause.notMatchedAction().columns.multipartIdentifier()
+              .asScala.map(attr => UnresolvedAttribute(visitMultipartIdentifier(attr)))
+            val values = clause.notMatchedAction().expression().asScala.map(expression)
+            if (columns.size != values.size) {
+              throw new ParseException("The number of inserted values cannot match the fields.",
+                clause.notMatchedAction())
+            }
+            InsertAction(condition, columns.zip(values).map(kv => Assignment(kv._1, kv._2)).toSeq)
+          }
+        } else {
+          // It should not be here.
+          throw new ParseException(s"Unrecognized not matched action: ${clause.notMatchedAction().getText}",
+            clause.notMatchedAction())
+        }
+      }
+    }
+    if (matchedActions.isEmpty && notMatchedActions.isEmpty) {
+      throw new ParseException("There must be at least one WHEN clause in a MERGE statement", ctx)
+    }
+    // children being empty means that the condition is not set
+    val matchedActionSize = matchedActions.length
+    if (matchedActionSize >= 2 && !matchedActions.init.forall(_.condition.nonEmpty)) {
+      throw new ParseException("When there are more than one MATCHED clauses in a MERGE " +
+        "statement, only the last MATCHED clause can omit the condition.", ctx)
+    }
+    val notMatchedActionSize = notMatchedActions.length
+    if (notMatchedActionSize >= 2 && !notMatchedActions.init.forall(_.condition.nonEmpty)) {
+      throw new ParseException("When there are more than one NOT MATCHED clauses in a MERGE " +
+        "statement, only the last NOT MATCHED clause can omit the condition.", ctx)
+    }
+
+    MergeIntoTable(
+      aliasedTarget,
+      aliasedSource,
+      mergeCondition,
+      matchedActions.toSeq,
+      notMatchedActions.toSeq)
+  }
+
+  /**
+   * Create a partition specification map.
+   */
+  override def visitPartitionSpec(
+                                   ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
+    val legacyNullAsString =
+      conf.getConf(SQLConf.LEGACY_PARSE_NULL_PARTITION_SPEC_AS_STRING_LITERAL)
+    val parts = ctx.partitionVal.asScala.map { pVal =>
+      val name = pVal.identifier.getText
+      val value = Option(pVal.constant).map(v => visitStringConstant(v, legacyNullAsString))
+      name -> value
+    }
+    // Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values
+    // in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for
+    // partition columns will be done in analyzer.
+    if (conf.caseSensitiveAnalysis) {
+      checkDuplicateKeys(parts.toSeq, ctx)
+    } else {
+      checkDuplicateKeys(parts.map(kv => kv._1.toLowerCase(Locale.ROOT) -> kv._2).toSeq, ctx)
+    }
+    parts.toMap
+  }
+
+  /**
+   * Create a partition specification map without optional values.
+   */
+  protected def visitNonOptionalPartitionSpec(
+                                               ctx: PartitionSpecContext): Map[String, String] = withOrigin(ctx) {
+    visitPartitionSpec(ctx).map {
+      case (key, None) => throw new ParseException(s"Found an empty partition key '$key'.", ctx)
+      case (key, Some(value)) => key -> value
+    }
+  }
+
+  /**
+   * Convert a constant of any type into a string. This is typically used in DDL commands, and its
+   * main purpose is to prevent slight differences due to back to back conversions i.e.:
+   * String -> Literal -> String.
+   */
+  protected def visitStringConstant(
+                                     ctx: ConstantContext,
+                                     legacyNullAsString: Boolean): String = withOrigin(ctx) {
+    expression(ctx) match {
+      case Literal(null, _) if !legacyNullAsString => null
+      case l@Literal(null, _) => l.toString
+      case l: Literal =>
+        // TODO For v2 commands, we will cast the string back to its actual value,
+        //  which is a waste and can be improved in the future.
+        Cast(l, StringType, Some(conf.sessionLocalTimeZone)).eval().toString
+      case other =>
+        throw new IllegalArgumentException(s"Only literals are allowed in the " +
+          s"partition spec, but got ${other.sql}")
+    }
+  }
+
+  /**
+   * Add ORDER BY/SORT BY/CLUSTER BY/DISTRIBUTE BY/LIMIT/WINDOWS clauses to the logical plan. These
+   * clauses determine the shape (ordering/partitioning/rows) of the query result.
+   */
+  private def withQueryResultClauses(
+                                      ctx: QueryOrganizationContext,
+                                      query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    import ctx._
+
+    // Handle ORDER BY, SORT BY, DISTRIBUTE BY, and CLUSTER BY clause.
+    val withOrder = if (
+      !order.isEmpty && sort.isEmpty && distributeBy.isEmpty && clusterBy.isEmpty) {
+      // ORDER BY ...
+      Sort(order.asScala.map(visitSortItem).toSeq, global = true, query)
+    } else if (order.isEmpty && !sort.isEmpty && distributeBy.isEmpty && clusterBy.isEmpty) {
+      // SORT BY ...
+      Sort(sort.asScala.map(visitSortItem).toSeq, global = false, query)
+    } else if (order.isEmpty && sort.isEmpty && !distributeBy.isEmpty && clusterBy.isEmpty) {
+      // DISTRIBUTE BY ...
+      withRepartitionByExpression(ctx, expressionList(distributeBy), query)
+    } else if (order.isEmpty && !sort.isEmpty && !distributeBy.isEmpty && clusterBy.isEmpty) {
+      // SORT BY ... DISTRIBUTE BY ...
+      Sort(
+        sort.asScala.map(visitSortItem).toSeq,
+        global = false,
+        withRepartitionByExpression(ctx, expressionList(distributeBy), query))
+    } else if (order.isEmpty && sort.isEmpty && distributeBy.isEmpty && !clusterBy.isEmpty) {
+      // CLUSTER BY ...
+      val expressions = expressionList(clusterBy)
+      Sort(
+        expressions.map(SortOrder(_, Ascending)),
+        global = false,
+        withRepartitionByExpression(ctx, expressions, query))
+    } else if (order.isEmpty && sort.isEmpty && distributeBy.isEmpty && clusterBy.isEmpty) {
+      // [EMPTY]
+      query
+    } else {
+      throw new ParseException(
+        "Combination of ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY is not supported", ctx)
+    }
+
+    // WINDOWS
+    val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)
+
+    // LIMIT
+    // - LIMIT ALL is the same as omitting the LIMIT clause
+    withWindow.optional(limit) {
+      Limit(typedVisit(limit), withWindow)
+    }
+  }
+
+  /**
+   * Create a clause for DISTRIBUTE BY.
+   */
+  protected def withRepartitionByExpression(
+                                             ctx: QueryOrganizationContext,
+                                             expressions: Seq[Expression],
+                                             query: LogicalPlan): LogicalPlan = {
+    RepartitionByExpression(expressions, query, None)
+  }
+
+  override def visitTransformQuerySpecification(
+                                                 ctx: TransformQuerySpecificationContext): LogicalPlan = withOrigin(ctx) {
+    val from = OneRowRelation().optional(ctx.fromClause) {
+      visitFromClause(ctx.fromClause)
+    }
+    withTransformQuerySpecification(
+      ctx,
+      ctx.transformClause,
+      ctx.lateralView,
+      ctx.whereClause,
+      ctx.aggregationClause,
+      ctx.havingClause,
+      ctx.windowClause,
+      from
+    )
+  }
+
+  override def visitRegularQuerySpecification(
+                                               ctx: RegularQuerySpecificationContext): LogicalPlan = withOrigin(ctx) {
+    val from = OneRowRelation().optional(ctx.fromClause) {
+      visitFromClause(ctx.fromClause)
+    }
+    withSelectQuerySpecification(
+      ctx,
+      ctx.selectClause,
+      ctx.lateralView,
+      ctx.whereClause,
+      ctx.aggregationClause,
+      ctx.havingClause,
+      ctx.windowClause,
+      from
+    )
+  }
+
+  override def visitNamedExpressionSeq(
+                                        ctx: NamedExpressionSeqContext): Seq[Expression] = {
+    Option(ctx).toSeq
+      .flatMap(_.namedExpression.asScala)
+      .map(typedVisit[Expression])
+  }
+
+  override def visitExpressionSeq(ctx: ExpressionSeqContext): Seq[Expression] = {
+    Option(ctx).toSeq
+      .flatMap(_.expression.asScala)
+      .map(typedVisit[Expression])
+  }
+
+  /**
+   * Create a logical plan using a having clause.
+   */
+  private def withHavingClause(
+                                ctx: HavingClauseContext, plan: LogicalPlan): LogicalPlan = {
+    // Note that we add a cast to non-predicate expressions. If the expression itself is
+    // already boolean, the optimizer will get rid of the unnecessary cast.
+    val predicate = expression(ctx.booleanExpression) match {
+      case p: Predicate => p
+      case e => Cast(e, BooleanType)
+    }
+    UnresolvedHaving(predicate, plan)
+  }
+
+  /**
+   * Create a logical plan using a where clause.
+   */
+  private def withWhereClause(ctx: WhereClauseContext, plan: LogicalPlan): LogicalPlan = {
+    Filter(expression(ctx.booleanExpression), plan)
+  }
+
+  /**
+   * Add a hive-style transform (SELECT TRANSFORM/MAP/REDUCE) query specification to a logical plan.
+   */
+  private def withTransformQuerySpecification(
+                                               ctx: ParserRuleContext,
+                                               transformClause: TransformClauseContext,
+                                               lateralView: java.util.List[LateralViewContext],
+                                               whereClause: WhereClauseContext,
+                                               aggregationClause: AggregationClauseContext,
+                                               havingClause: HavingClauseContext,
+                                               windowClause: WindowClauseContext,
+                                               relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    if (transformClause.setQuantifier != null) {
+      throw new ParseException("TRANSFORM does not support DISTINCT/ALL in inputs", transformClause.setQuantifier)
+    }
+    // Create the attributes.
+    val (attributes, schemaLess) = if (transformClause.colTypeList != null) {
+      // Typed return columns.
+      (createSchema(transformClause.colTypeList).toAttributes, false)
+    } else if (transformClause.identifierSeq != null) {
+      // Untyped return columns.
+      val attrs = visitIdentifierSeq(transformClause.identifierSeq).map { name =>
+        AttributeReference(name, StringType, nullable = true)()
+      }
+      (attrs, false)
+    } else {
+      (Seq(AttributeReference("key", StringType)(),
+        AttributeReference("value", StringType)()), true)
+    }
+
+    val plan = visitCommonSelectQueryClausePlan(
+      relation,
+      visitExpressionSeq(transformClause.expressionSeq),
+      lateralView,
+      whereClause,
+      aggregationClause,
+      havingClause,
+      windowClause,
+      isDistinct = false)
+
+    ScriptTransformation(
+      string(transformClause.script),
+      attributes,
+      plan,
+      withScriptIOSchema(
+        ctx,
+        transformClause.inRowFormat,
+        transformClause.recordWriter,
+        transformClause.outRowFormat,
+        transformClause.recordReader,
+        schemaLess
+      )
+    )
+  }
+
+  /**
+   * Add a regular (SELECT) query specification to a logical plan. The query specification
+   * is the core of the logical plan, this is where sourcing (FROM clause), projection (SELECT),
+   * aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
+   *
+   * Note that query hints are ignored (both by the parser and the builder).
+   */
+  private def withSelectQuerySpecification(
+                                            ctx: ParserRuleContext,
+                                            selectClause: SelectClauseContext,
+                                            lateralView: java.util.List[LateralViewContext],
+                                            whereClause: WhereClauseContext,
+                                            aggregationClause: AggregationClauseContext,
+                                            havingClause: HavingClauseContext,
+                                            windowClause: WindowClauseContext,
+                                            relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    val isDistinct = selectClause.setQuantifier() != null &&
+      selectClause.setQuantifier().DISTINCT() != null
+
+    val plan = visitCommonSelectQueryClausePlan(
+      relation,
+      visitNamedExpressionSeq(selectClause.namedExpressionSeq),
+      lateralView,
+      whereClause,
+      aggregationClause,
+      havingClause,
+      windowClause,
+      isDistinct)
+
+    // Hint
+    selectClause.hints.asScala.foldRight(plan)(withHints)
+  }
+
+  def visitCommonSelectQueryClausePlan(
+                                        relation: LogicalPlan,
+                                        expressions: Seq[Expression],
+                                        lateralView: java.util.List[LateralViewContext],
+                                        whereClause: WhereClauseContext,
+                                        aggregationClause: AggregationClauseContext,
+                                        havingClause: HavingClauseContext,
+                                        windowClause: WindowClauseContext,
+                                        isDistinct: Boolean): LogicalPlan = {
+    // Add lateral views.
+    val withLateralView = lateralView.asScala.foldLeft(relation)(withGenerate)
+
+    // Add where.
+    val withFilter = withLateralView.optionalMap(whereClause)(withWhereClause)
+
+    // Add aggregation or a project.
+    val namedExpressions = expressions.map {
+      case e: NamedExpression => e
+      case e: Expression => UnresolvedAlias(e)
+    }
+
+    def createProject() = if (namedExpressions.nonEmpty) {
+      Project(namedExpressions, withFilter)
+    } else {
+      withFilter
+    }
+
+    val withProject = if (aggregationClause == null && havingClause != null) {
+      if (conf.getConf(SQLConf.LEGACY_HAVING_WITHOUT_GROUP_BY_AS_WHERE)) {
+        // If the legacy conf is set, treat HAVING without GROUP BY as WHERE.
+        val predicate = expression(havingClause.booleanExpression) match {
+          case p: Predicate => p
+          case e => Cast(e, BooleanType)
+        }
+        Filter(predicate, createProject())
+      } else {
+        // According to SQL standard, HAVING without GROUP BY means global aggregate.
+        withHavingClause(havingClause, Aggregate(Nil, namedExpressions, withFilter))
+      }
+    } else if (aggregationClause != null) {
+      val aggregate = withAggregationClause(aggregationClause, namedExpressions, withFilter)
+      aggregate.optionalMap(havingClause)(withHavingClause)
+    } else {
+      // When hitting this branch, `having` must be null.
+      createProject()
+    }
+
+    // Distinct
+    val withDistinct = if (isDistinct) {
+      Distinct(withProject)
+    } else {
+      withProject
+    }
+
+    // Window
+    val withWindow = withDistinct.optionalMap(windowClause)(withWindowClause)
+
+    withWindow
+  }
+
+  // Script Transform's input/output format.
+  type ScriptIOFormat =
+    (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
+
+  protected def getRowFormatDelimited(ctx: RowFormatDelimitedContext): ScriptIOFormat = {
+    // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
+    // expects a seq of pairs in which the old parsers' token names are used as keys.
+    // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
+    // retrieving the key value pairs ourselves.
+    val entries = entry("TOK_TABLEROWFORMATFIELD", ctx.fieldsTerminatedBy) ++
+      entry("TOK_TABLEROWFORMATCOLLITEMS", ctx.collectionItemsTerminatedBy) ++
+      entry("TOK_TABLEROWFORMATMAPKEYS", ctx.keysTerminatedBy) ++
+      entry("TOK_TABLEROWFORMATNULL", ctx.nullDefinedAs) ++
+      Option(ctx.linesSeparatedBy).toSeq.map { token =>
+        val value = string(token)
+        validate(
+          value == "\n",
+          s"LINES TERMINATED BY only supports newline '\\n' right now: $value",
+          ctx)
+        "TOK_TABLEROWFORMATLINES" -> value
+      }
+
+    (entries, None, Seq.empty, None)
+  }
+
+  /**
+   * Create a [[ScriptInputOutputSchema]].
+   */
+  protected def withScriptIOSchema(
+                                    ctx: ParserRuleContext,
+                                    inRowFormat: RowFormatContext,
+                                    recordWriter: Token,
+                                    outRowFormat: RowFormatContext,
+                                    recordReader: Token,
+                                    schemaLess: Boolean): ScriptInputOutputSchema = {
+
+    def format(fmt: RowFormatContext): ScriptIOFormat = fmt match {
+      case c: RowFormatDelimitedContext =>
+        getRowFormatDelimited(c)
+
+      case c: RowFormatSerdeContext =>
+        throw new ParseException("TRANSFORM with serde is only supported in hive mode", ctx)
+
+      // SPARK-32106: When there is no definition about format, we return empty result
+      // to use a built-in default Serde in SparkScriptTransformationExec.
+      case null =>
+        (Nil, None, Seq.empty, None)
+    }
+
+    val (inFormat, inSerdeClass, inSerdeProps, reader) = format(inRowFormat)
+
+    val (outFormat, outSerdeClass, outSerdeProps, writer) = format(outRowFormat)
+
+    ScriptInputOutputSchema(
+      inFormat, outFormat,
+      inSerdeClass, outSerdeClass,
+      inSerdeProps, outSerdeProps,
+      reader, writer,
+      schemaLess)
+  }
+
+  /**
+   * Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma
+   * separated) relations here, these get converted into a single plan by condition-less inner join.
+   */
+  override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
+    val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
+      val right = plan(relation.relationPrimary)
+      val join = right.optionalMap(left) { (left, right) =>
+        if (relation.LATERAL != null) {
+          if (!relation.relationPrimary.isInstanceOf[AliasedQueryContext]) {
+            throw new ParseException(s"LATERAL can only be used with subquery", relation.relationPrimary)
+          }
+          LateralJoin(left, LateralSubquery(right), Inner, None)
+        } else {
+          Join(left, right, Inner, None, JoinHint.NONE)
+        }
+      }
+      withJoinRelations(join, relation)
+    }
+    if (ctx.pivotClause() != null) {
+      if (!ctx.lateralView.isEmpty) {
+        throw new ParseException("LATERAL cannot be used together with PIVOT in FROM clause", ctx)
+      }
+      withPivot(ctx.pivotClause, from)
+    } else {
+      ctx.lateralView.asScala.foldLeft(from)(withGenerate)
+    }
+  }
+
+  /**
+   * Connect two queries by a Set operator.
+   *
+   * Supported Set operators are:
+   * - UNION [ DISTINCT | ALL ]
+   * - EXCEPT [ DISTINCT | ALL ]
+   * - MINUS [ DISTINCT | ALL ]
+   * - INTERSECT [DISTINCT | ALL]
+   */
+  override def visitSetOperation(ctx: SetOperationContext): LogicalPlan = withOrigin(ctx) {
+    val left = plan(ctx.left)
+    val right = plan(ctx.right)
+    val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
+    ctx.operator.getType match {
+      case HoodieSqlBaseParser.UNION if all =>
+        Union(left, right)
+      case HoodieSqlBaseParser.UNION =>
+        Distinct(Union(left, right))
+      case HoodieSqlBaseParser.INTERSECT if all =>
+        Intersect(left, right, isAll = true)
+      case HoodieSqlBaseParser.INTERSECT =>
+        Intersect(left, right, isAll = false)
+      case HoodieSqlBaseParser.EXCEPT if all =>
+        Except(left, right, isAll = true)
+      case HoodieSqlBaseParser.EXCEPT =>
+        Except(left, right, isAll = false)
+      case HoodieSqlBaseParser.SETMINUS if all =>
+        Except(left, right, isAll = true)
+      case HoodieSqlBaseParser.SETMINUS =>
+        Except(left, right, isAll = false)
+    }
+  }
+
+  /**
+   * Add a [[WithWindowDefinition]] operator to a logical plan.
+   */
+  private def withWindowClause(
+                                ctx: WindowClauseContext,
+                                query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    // Collect all window specifications defined in the WINDOW clause.
+    val baseWindowTuples = ctx.namedWindow.asScala.map {
+      wCtx =>
+        (wCtx.name.getText, typedVisit[WindowSpec](wCtx.windowSpec))
+    }
+    baseWindowTuples.groupBy(_._1).foreach { kv =>
+      if (kv._2.size > 1) {
+        throw new ParseException(s"The definition of window '${kv._1}' is repetitive", ctx)
+      }
+    }
+    val baseWindowMap = baseWindowTuples.toMap
+
+    // Handle cases like
+    // window w1 as (partition by p_mfgr order by p_name
+    //               range between 2 preceding and 2 following),
+    //        w2 as w1
+    val windowMapView = baseWindowMap.mapValues {
+      case WindowSpecReference(name) =>
+        baseWindowMap.get(name) match {
+          case Some(spec: WindowSpecDefinition) =>
+            spec
+          case Some(ref) =>
+            throw new ParseException(s"Window reference '$name' is not a window specification", ctx)
+          case None =>
+            throw new ParseException(s"Cannot resolve window reference '$name'", ctx)
+        }
+      case spec: WindowSpecDefinition => spec
+    }
+
+    // Note that mapValues creates a view instead of materialized map. We force materialization by
+    // mapping over identity.
+    WithWindowDefinition(windowMapView.map(identity).toMap, query)
+  }
+
+  /**
+   * Add an [[Aggregate]] to a logical plan.
+   */
+  private def withAggregationClause(
+                                     ctx: AggregationClauseContext,
+                                     selectExpressions: Seq[NamedExpression],
+                                     query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    if (ctx.groupingExpressionsWithGroupingAnalytics.isEmpty) {
+      val groupByExpressions = expressionList(ctx.groupingExpressions)
+      if (ctx.GROUPING != null) {
+        // GROUP BY ... GROUPING SETS (...)
+        // `groupByExpressions` can be non-empty for Hive compatibility. It may add extra grouping
+        // expressions that do not exist in GROUPING SETS (...), and the value is always null.
+        // For example, `SELECT a, b, c FROM ... GROUP BY a, b, c GROUPING SETS (a, b)`, the output
+        // of column `c` is always null.
+        val groupingSets =
+        ctx.groupingSet.asScala.map(_.expression.asScala.map(e => expression(e)).toSeq)
+        Aggregate(Seq(GroupingSets(groupingSets.toSeq, groupByExpressions)),
+          selectExpressions, query)
+      } else {
+        // GROUP BY .... (WITH CUBE | WITH ROLLUP)?
+        val mappedGroupByExpressions = if (ctx.CUBE != null) {
+          Seq(Cube(groupByExpressions.map(Seq(_))))
+        } else if (ctx.ROLLUP != null) {
+          Seq(Rollup(groupByExpressions.map(Seq(_))))
+        } else {
+          groupByExpressions
+        }
+        Aggregate(mappedGroupByExpressions, selectExpressions, query)
+      }
+    } else {
+      val groupByExpressions =
+        ctx.groupingExpressionsWithGroupingAnalytics.asScala
+          .map(groupByExpr => {
+            val groupingAnalytics = groupByExpr.groupingAnalytics
+            if (groupingAnalytics != null) {
+              visitGroupingAnalytics(groupingAnalytics)
+            } else {
+              expression(groupByExpr.expression)
+            }
+          })
+      Aggregate(groupByExpressions.toSeq, selectExpressions, query)
+    }
+  }
+
+  override def visitGroupingAnalytics(
+                                       groupingAnalytics: GroupingAnalyticsContext): BaseGroupingSets = {
+    val groupingSets = groupingAnalytics.groupingSet.asScala
+      .map(_.expression.asScala.map(e => expression(e)).toSeq)
+    if (groupingAnalytics.CUBE != null) {
+      // CUBE(A, B, (A, B), ()) is not supported.
+      if (groupingSets.exists(_.isEmpty)) {
+        throw new ParseException(s"Empty set in CUBE grouping sets is not supported.", groupingAnalytics)
+      }
+      Cube(groupingSets.toSeq)
+    } else if (groupingAnalytics.ROLLUP != null) {
+      // ROLLUP(A, B, (A, B), ()) is not supported.
+      if (groupingSets.exists(_.isEmpty)) {
+        throw new ParseException(s"Empty set in ROLLUP grouping sets is not supported.", groupingAnalytics)
+      }
+      Rollup(groupingSets.toSeq)
+    } else {
+      assert(groupingAnalytics.GROUPING != null && groupingAnalytics.SETS != null)
+      val groupingSets = groupingAnalytics.groupingElement.asScala.flatMap { expr =>
+        val groupingAnalytics = expr.groupingAnalytics()
+        if (groupingAnalytics != null) {
+          visitGroupingAnalytics(groupingAnalytics).selectedGroupByExprs
+        } else {
+          Seq(expr.groupingSet().expression().asScala.map(e => expression(e)).toSeq)
+        }
+      }
+      GroupingSets(groupingSets.toSeq)
+    }
+  }
+
+  /**
+   * Add [[UnresolvedHint]]s to a logical plan.
+   */
+  private def withHints(
+                         ctx: HintContext,
+                         query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    var plan = query
+    ctx.hintStatements.asScala.reverse.foreach { stmt =>
+      plan = UnresolvedHint(stmt.hintName.getText,
+        stmt.parameters.asScala.map(expression).toSeq, plan)
+    }
+    plan
+  }
+
+  /**
+   * Add a [[Pivot]] to a logical plan.
+   */
+  private def withPivot(
+                         ctx: PivotClauseContext,
+                         query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    val aggregates = Option(ctx.aggregates).toSeq
+      .flatMap(_.namedExpression.asScala)
+      .map(typedVisit[Expression])
+    val pivotColumn = if (ctx.pivotColumn.identifiers.size == 1) {
+      UnresolvedAttribute.quoted(ctx.pivotColumn.identifier.getText)
+    } else {
+      CreateStruct(
+        ctx.pivotColumn.identifiers.asScala.map(
+          identifier => UnresolvedAttribute.quoted(identifier.getText)).toSeq)
+    }
+    val pivotValues = ctx.pivotValues.asScala.map(visitPivotValue)
+    Pivot(None, pivotColumn, pivotValues.toSeq, aggregates, query)
+  }
+
+  /**
+   * Create a Pivot column value with or without an alias.
+   */
+  override def visitPivotValue(ctx: PivotValueContext): Expression = withOrigin(ctx) {
+    val e = expression(ctx.expression)
+    if (ctx.identifier != null) {
+      Alias(e, ctx.identifier.getText)()
+    } else {
+      e
+    }
+  }
+
+  /**
+   * Add a [[Generate]] (Lateral View) to a logical plan.
+   */
+  private def withGenerate(
+                            query: LogicalPlan,
+                            ctx: LateralViewContext): LogicalPlan = withOrigin(ctx) {
+    val expressions = expressionList(ctx.expression)
+    Generate(
+      UnresolvedGenerator(visitFunctionName(ctx.qualifiedName), expressions),
+      unrequiredChildIndex = Nil,
+      outer = ctx.OUTER != null,
+      // scalastyle:off caselocale
+      Some(ctx.tblName.getText.toLowerCase),
+      // scalastyle:on caselocale
+      ctx.colName.asScala.map(_.getText).map(UnresolvedAttribute.quoted).toSeq,
+      query)
+  }
+
+  /**
+   * Create a single relation referenced in a FROM clause. This method is used when a part of the
+   * join condition is nested, for example:
+   * {{{
+   *   select * from t1 join (t2 cross join t3) on col1 = col2
+   * }}}
+   */
+  override def visitRelation(ctx: RelationContext): LogicalPlan = withOrigin(ctx) {
+    withJoinRelations(plan(ctx.relationPrimary), ctx)
+  }
+
+  /**
+   * Join one more [[LogicalPlan]]s to the current logical plan.
+   */
+  private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): LogicalPlan = {
+    ctx.joinRelation.asScala.foldLeft(base) { (left, join) =>
+      withOrigin(join) {
+        val baseJoinType = join.joinType match {
+          case null => Inner
+          case jt if jt.CROSS != null => Cross
+          case jt if jt.FULL != null => FullOuter
+          case jt if jt.SEMI != null => LeftSemi
+          case jt if jt.ANTI != null => LeftAnti
+          case jt if jt.LEFT != null => LeftOuter
+          case jt if jt.RIGHT != null => RightOuter
+          case _ => Inner
+        }
+
+        if (join.LATERAL != null && !join.right.isInstanceOf[AliasedQueryContext]) {
+          throw new ParseException(s"LATERAL can only be used with subquery", join.right)
+        }
+
+        // Resolve the join type and join condition
+        val (joinType, condition) = Option(join.joinCriteria) match {
+          case Some(c) if c.USING != null =>
+            if (join.LATERAL != null) {
+              throw new ParseException("LATERAL join with USING join is not supported", ctx)
+            }
+            (UsingJoin(baseJoinType, visitIdentifierList(c.identifierList)), None)
+          case Some(c) if c.booleanExpression != null =>
+            (baseJoinType, Option(expression(c.booleanExpression)))
+          case Some(c) =>
+            throw new ParseException(s"Unimplemented joinCriteria: $c", ctx)
+          case None if join.NATURAL != null =>
+            if (join.LATERAL != null) {
+              throw new ParseException("LATERAL join with NATURAL join is not supported", ctx)
+            }
+            if (baseJoinType == Cross) {
+              throw new ParseException("NATURAL CROSS JOIN is not supported", ctx)
+            }
+            (NaturalJoin(baseJoinType), None)
+          case None =>
+            (baseJoinType, None)
+        }
+        if (join.LATERAL != null) {
+          if (!Seq(Inner, Cross, LeftOuter).contains(joinType)) {
+            throw new ParseException(s"Unsupported LATERAL join type ${joinType.toString}", ctx)
+          }
+          LateralJoin(left, LateralSubquery(plan(join.right)), joinType, condition)
+        } else {
+          Join(left, plan(join.right), joinType, condition, JoinHint.NONE)
+        }
+      }
+    }
+  }
+
+  /**
+   * Add a [[Sample]] to a logical plan.
+   *
+   * This currently supports the following sampling methods:
+   * - TABLESAMPLE(x ROWS): Sample the table down to the given number of rows.
+   * - TABLESAMPLE(x PERCENT): Sample the table down to the given percentage. Note that percentages
+   * are defined as a number between 0 and 100.
+   * - TABLESAMPLE(BUCKET x OUT OF y): Sample the table down to a 'x' divided by 'y' fraction.
+   */
+  private def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    // Create a sampled plan if we need one.
+    def sample(fraction: Double): Sample = {
+      // The range of fraction accepted by Sample is [0, 1]. Because Hive's block sampling
+      // function takes X PERCENT as the input and the range of X is [0, 100], we need to
+      // adjust the fraction.
+      val eps = RandomSampler.roundingEpsilon
+      validate(fraction >= 0.0 - eps && fraction <= 1.0 + eps,
+        s"Sampling fraction ($fraction) must be on interval [0, 1]",
+        ctx)
+      Sample(0.0, fraction, withReplacement = false, (math.random * 1000).toInt, query)
+    }
+
+    if (ctx.sampleMethod() == null) {
+      throw new ParseException("TABLESAMPLE does not accept empty inputs.", ctx)
+    }
+
+    ctx.sampleMethod() match {
+      case ctx: SampleByRowsContext =>
+        Limit(expression(ctx.expression), query)
+
+      case ctx: SampleByPercentileContext =>
+        val fraction = ctx.percentage.getText.toDouble
+        val sign = if (ctx.negativeSign == null) 1 else -1
+        sample(sign * fraction / 100.0d)
+
+      case ctx: SampleByBytesContext =>
+        val bytesStr = ctx.bytes.getText
+        if (bytesStr.matches("[0-9]+[bBkKmMgG]")) {
+          throw new ParseException(s"TABLESAMPLE(byteLengthLiteral) is not supported", ctx)
+        } else {
+          throw new ParseException(s"$bytesStr is not a valid byte length literal, " +
+            "expected syntax: DIGIT+ ('B' | 'K' | 'M' | 'G')", ctx)
+        }
+
+      case ctx: SampleByBucketContext if ctx.ON() != null =>
+        if (ctx.identifier != null) {
+          throw new ParseException(s"TABLESAMPLE(BUCKET x OUT OF y ON colname) is not supported", ctx)
+        } else {
+          throw new ParseException(s"TABLESAMPLE(BUCKET x OUT OF y ON function) is not supported", ctx)
+        }
+
+      case ctx: SampleByBucketContext =>
+        sample(ctx.numerator.getText.toDouble / ctx.denominator.getText.toDouble)
+    }
+  }
+
+  /**
+   * Create a logical plan for a sub-query.
+   */
+  override def visitSubquery(ctx: SubqueryContext): LogicalPlan = withOrigin(ctx) {
+    plan(ctx.query)
+  }
+
+  /**
+   * Create an un-aliased table reference. This is typically used for top-level table references,
+   * for example:
+   * {{{
+   *   INSERT INTO db.tbl2
+   *   TABLE db.tbl1
+   * }}}
+   */
+  override def visitTable(ctx: TableContext): LogicalPlan = withOrigin(ctx) {
+    UnresolvedRelation(visitMultipartIdentifier(ctx.multipartIdentifier))
+  }
+
+  /**
+   * Create a table-valued function call with arguments, e.g. range(1000)
+   */
+  override def visitTableValuedFunction(ctx: TableValuedFunctionContext)
+  : LogicalPlan = withOrigin(ctx) {
+    val func = ctx.functionTable
+    val aliases = if (func.tableAlias.identifierList != null) {
+      visitIdentifierList(func.tableAlias.identifierList)
+    } else {
+      Seq.empty
+    }
+    val name = getFunctionIdentifier(func.functionName)
+    if (name.database.nonEmpty) {
+      operationNotAllowed(s"table valued function cannot specify database name: $name", ctx)
+    }
+
+    val tvf = UnresolvedTableValuedFunction(
+      name, func.expression.asScala.map(expression).toSeq, aliases)
+    tvf.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan)
+  }
+
+  /**
+   * Create an inline table (a virtual table in Hive parlance).
+   */
+  override def visitInlineTable(ctx: InlineTableContext): LogicalPlan = withOrigin(ctx) {
+    // Get the backing expressions.
+    val rows = ctx.expression.asScala.map { e =>
+      expression(e) match {
+        // inline table comes in two styles:
+        // style 1: values (1), (2), (3)  -- multiple columns are supported
+        // style 2: values 1, 2, 3  -- only a single column is supported here
+        case struct: CreateNamedStruct => struct.valExprs // style 1
+        case child => Seq(child) // style 2
+      }
+    }
+
+    val aliases = if (ctx.tableAlias.identifierList != null) {
+      visitIdentifierList(ctx.tableAlias.identifierList)
+    } else {
+      Seq.tabulate(rows.head.size)(i => s"col${i + 1}")
+    }
+
+    val table = UnresolvedInlineTable(aliases, rows.toSeq)
+    table.optionalMap(ctx.tableAlias.strictIdentifier)(aliasPlan)
+  }
+
+  /**
+   * Create an alias (SubqueryAlias) for a join relation. This is practically the same as
+   * visitAliasedQuery and visitNamedExpression, ANTLR4 however requires us to use 3 different
+   * hooks. We could add alias names for output columns, for example:
+   * {{{
+   *   SELECT a, b, c, d FROM (src1 s1 INNER JOIN src2 s2 ON s1.id = s2.id) dst(a, b, c, d)
+   * }}}
+   */
+  override def visitAliasedRelation(ctx: AliasedRelationContext): LogicalPlan = withOrigin(ctx) {
+    val relation = plan(ctx.relation).optionalMap(ctx.sample)(withSample)
+    mayApplyAliasPlan(ctx.tableAlias, relation)
+  }
+
+  /**
+   * Create an alias (SubqueryAlias) for a sub-query. This is practically the same as
+   * visitAliasedRelation and visitNamedExpression, ANTLR4 however requires us to use 3 different
+   * hooks. We could add alias names for output columns, for example:
+   * {{{
+   *   SELECT col1, col2 FROM testData AS t(col1, col2)
+   * }}}
+   */
+  override def visitAliasedQuery(ctx: AliasedQueryContext): LogicalPlan = withOrigin(ctx) {
+    val relation = plan(ctx.query).optionalMap(ctx.sample)(withSample)
+    if (ctx.tableAlias.strictIdentifier == null) {
+      // For un-aliased subqueries, use a default alias name that is not likely to conflict with
+      // normal subquery names, so that parent operators can only access the columns in subquery by
+      // unqualified names. Users can still use this special qualifier to access columns if they
+      // know it, but that's not recommended.
+      SubqueryAlias("__auto_generated_subquery_name", relation)
+    } else {
+      mayApplyAliasPlan(ctx.tableAlias, relation)
+    }
+  }
+
+  /**
+   * Create an alias ([[SubqueryAlias]]) for a [[LogicalPlan]].
+   */
+  private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): LogicalPlan = {
+    SubqueryAlias(alias.getText, plan)
+  }
+
+  /**
+   * If aliases specified in a FROM clause, create a subquery alias ([[SubqueryAlias]]) and
+   * column aliases for a [[LogicalPlan]].
+   */
+  private def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = {
+    if (tableAlias.strictIdentifier != null) {
+      val alias = tableAlias.strictIdentifier.getText
+      if (tableAlias.identifierList != null) {
+        val columnNames = visitIdentifierList(tableAlias.identifierList)
+        SubqueryAlias(alias, UnresolvedSubqueryColumnAliases(columnNames, plan))
+      } else {
+        SubqueryAlias(alias, plan)
+      }
+    } else {
+      plan
+    }
+  }
+
+  /**
+   * Create a Sequence of Strings for a parenthesis enclosed alias list.
+   */
+  override def visitIdentifierList(ctx: IdentifierListContext): Seq[String] = withOrigin(ctx) {
+    visitIdentifierSeq(ctx.identifierSeq)
+  }
+
+  /**
+   * Create a Sequence of Strings for an identifier list.
+   */
+  override def visitIdentifierSeq(ctx: IdentifierSeqContext): Seq[String] = withOrigin(ctx) {
+    ctx.ident.asScala.map(_.getText).toSeq
+  }
+
+  /* ********************************************************************************************
+   * Table Identifier parsing
+   * ******************************************************************************************** */
+
+  /**
+   * Create a [[TableIdentifier]] from a 'tableName' or 'databaseName'.'tableName' pattern.
+   */
+  override def visitTableIdentifier(
+                                     ctx: TableIdentifierContext): TableIdentifier = withOrigin(ctx) {
+    TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText))
+  }
+
+  /**
+   * Create a [[FunctionIdentifier]] from a 'functionName' or 'databaseName'.'functionName' pattern.
+   */
+  override def visitFunctionIdentifier(
+                                        ctx: FunctionIdentifierContext): FunctionIdentifier = withOrigin(ctx) {
+    FunctionIdentifier(ctx.function.getText, Option(ctx.db).map(_.getText))
+  }
+
+  /**
+   * Create a multi-part identifier.
+   */
+  override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): Seq[String] =
+    withOrigin(ctx) {
+      ctx.parts.asScala.map(_.getText).toSeq
+    }
+
+  /* ********************************************************************************************
+   * Expression parsing
+   * ******************************************************************************************** */
+
+  /**
+   * Create an expression from the given context. This method just passes the context on to the
+   * visitor and only takes care of typing (We assume that the visitor returns an Expression here).
+   */
+  protected def expression(ctx: ParserRuleContext): Expression = typedVisit(ctx)
+
+  /**
+   * Create sequence of expressions from the given sequence of contexts.
+   */
+  private def expressionList(trees: java.util.List[ExpressionContext]): Seq[Expression] = {
+    trees.asScala.map(expression).toSeq
+  }
+
+  /**
+   * Create a star (i.e. all) expression; this selects all elements (in the specified object).
+   * Both un-targeted (global) and targeted aliases are supported.
+   */
+  override def visitStar(ctx: StarContext): Expression = withOrigin(ctx) {
+    UnresolvedStar(Option(ctx.qualifiedName()).map(_.identifier.asScala.map(_.getText).toSeq))
+  }
+
+  /**
+   * Create an aliased expression if an alias is specified. Both single and multi-aliases are
+   * supported.
+   */
+  override def visitNamedExpression(ctx: NamedExpressionContext): Expression = withOrigin(ctx) {
+    val e = expression(ctx.expression)
+    if (ctx.name != null) {
+      Alias(e, ctx.name.getText)()
+    } else if (ctx.identifierList != null) {
+      MultiAlias(e, visitIdentifierList(ctx.identifierList))
+    } else {
+      e
+    }
+  }
+
+  /**
+   * Combine a number of boolean expressions into a balanced expression tree. These expressions are
+   * either combined by a logical [[And]] or a logical [[Or]].
+   *
+   * A balanced binary tree is created because regular left recursive trees cause considerable
+   * performance degradations and can cause stack overflows.
+   */
+  override def visitLogicalBinary(ctx: LogicalBinaryContext): Expression = withOrigin(ctx) {
+    val expressionType = ctx.operator.getType
+    val expressionCombiner = expressionType match {
+      case HoodieSqlBaseParser.AND => And.apply _
+      case HoodieSqlBaseParser.OR => Or.apply _
+    }
+
+    // Collect all similar left hand contexts.
+    val contexts = ArrayBuffer(ctx.right)
+    var current = ctx.left
+
+    def collectContexts: Boolean = current match {
+      case lbc: LogicalBinaryContext if lbc.operator.getType == expressionType =>
+        contexts += lbc.right
+        current = lbc.left
+        true
+      case _ =>
+        contexts += current
+        false
+    }
+
+    while (collectContexts) {
+      // No body - all updates take place in the collectContexts.
+    }
+
+    // Reverse the contexts to have them in the same sequence as in the SQL statement & turn them
+    // into expressions.
+    val expressions = contexts.reverseMap(expression)
+
+    // Create a balanced tree.
+    def reduceToExpressionTree(low: Int, high: Int): Expression = high - low match {
+      case 0 =>
+        expressions(low)
+      case 1 =>
+        expressionCombiner(expressions(low), expressions(high))
+      case x =>
+        val mid = low + x / 2
+        expressionCombiner(
+          reduceToExpressionTree(low, mid),
+          reduceToExpressionTree(mid + 1, high))
+    }
+
+    reduceToExpressionTree(0, expressions.size - 1)
+  }
+
+  /**
+   * Invert a boolean expression.
+   */
+  override def visitLogicalNot(ctx: LogicalNotContext): Expression = withOrigin(ctx) {
+    Not(expression(ctx.booleanExpression()))
+  }
+
+  /**
+   * Create a filtering correlated sub-query (EXISTS).
+   */
+  override def visitExists(ctx: ExistsContext): Expression = {
+    Exists(plan(ctx.query))
+  }
+
+  /**
+   * Create a comparison expression. This compares two expressions. The following comparison
+   * operators are supported:
+   * - Equal: '=' or '=='
+   * - Null-safe Equal: '<=>'
+   * - Not Equal: '<>' or '!='
+   * - Less than: '<'
+   * - Less then or Equal: '<='
+   * - Greater than: '>'
+   * - Greater then or Equal: '>='
+   */
+  override def visitComparison(ctx: ComparisonContext): Expression = withOrigin(ctx) {
+    val left = expression(ctx.left)
+    val right = expression(ctx.right)
+    val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
+    operator.getSymbol.getType match {
+      case HoodieSqlBaseParser.EQ =>
+        EqualTo(left, right)
+      case HoodieSqlBaseParser.NSEQ =>
+        EqualNullSafe(left, right)
+      case HoodieSqlBaseParser.NEQ | HoodieSqlBaseParser.NEQJ =>
+        Not(EqualTo(left, right))
+      case HoodieSqlBaseParser.LT =>
+        LessThan(left, right)
+      case HoodieSqlBaseParser.LTE =>
+        LessThanOrEqual(left, right)
+      case HoodieSqlBaseParser.GT =>
+        GreaterThan(left, right)
+      case HoodieSqlBaseParser.GTE =>
+        GreaterThanOrEqual(left, right)
+    }
+  }
+
+  /**
+   * Create a predicated expression. A predicated expression is a normal expression with a
+   * predicate attached to it, for example:
+   * {{{
+   *    a + 1 IS NULL
+   * }}}
+   */
+  override def visitPredicated(ctx: PredicatedContext): Expression = withOrigin(ctx) {
+    val e = expression(ctx.valueExpression)
+    if (ctx.predicate != null) {
+      withPredicate(e, ctx.predicate)
+    } else {
+      e
+    }
+  }
+
+  /**
+   * Add a predicate to the given expression. Supported expressions are:
+   * - (NOT) BETWEEN
+   * - (NOT) IN
+   * - (NOT) LIKE (ANY | SOME | ALL)
+   * - (NOT) RLIKE
+   * - IS (NOT) NULL.
+   * - IS (NOT) (TRUE | FALSE | UNKNOWN)
+   * - IS (NOT) DISTINCT FROM
+   */
+  private def withPredicate(e: Expression, ctx: PredicateContext): Expression = withOrigin(ctx) {
+    // Invert a predicate if it has a valid NOT clause.
+    def invertIfNotDefined(e: Expression): Expression = ctx.NOT match {
+      case null => e
+      case not => Not(e)
+    }
+
+    def getValueExpressions(e: Expression): Seq[Expression] = e match {
+      case c: CreateNamedStruct => c.valExprs
+      case other => Seq(other)
+    }
+
+    // Create the predicate.
+    ctx.kind.getType match {
+      case HoodieSqlBaseParser.BETWEEN =>
+        // BETWEEN is translated to lower <= e && e <= upper
+        invertIfNotDefined(And(
+          GreaterThanOrEqual(e, expression(ctx.lower)),
+          LessThanOrEqual(e, expression(ctx.upper))))
+      case HoodieSqlBaseParser.IN if ctx.query != null =>
+        invertIfNotDefined(InSubquery(getValueExpressions(e), ListQuery(plan(ctx.query))))
+      case HoodieSqlBaseParser.IN =>
+        invertIfNotDefined(In(e, ctx.expression.asScala.map(expression).toSeq))
+      case HoodieSqlBaseParser.LIKE =>
+        Option(ctx.quantifier).map(_.getType) match {
+          case Some(HoodieSqlBaseParser.ANY) | Some(HoodieSqlBaseParser.SOME) =>
+            validate(!ctx.expression.isEmpty, "Expected something between '(' and ')'.", ctx)
+            val expressions = expressionList(ctx.expression)
+            if (expressions.forall(_.foldable) && expressions.forall(_.dataType == StringType)) {
+              // If there are many pattern expressions, will throw StackOverflowError.
+              // So we use LikeAny or NotLikeAny instead.
+              val patterns = expressions.map(_.eval(EmptyRow).asInstanceOf[UTF8String])
+              ctx.NOT match {
+                case null => LikeAny(e, patterns)
+                case _ => NotLikeAny(e, patterns)
+              }
+            } else {
+              ctx.expression.asScala.map(expression)
+                .map(p => invertIfNotDefined(new Like(e, p))).toSeq.reduceLeft(Or)
+            }
+          case Some(HoodieSqlBaseParser.ALL) =>
+            validate(!ctx.expression.isEmpty, "Expected something between '(' and ')'.", ctx)
+            val expressions = expressionList(ctx.expression)
+            if (expressions.forall(_.foldable) && expressions.forall(_.dataType == StringType)) {
+              // If there are many pattern expressions, will throw StackOverflowError.
+              // So we use LikeAll or NotLikeAll instead.
+              val patterns = expressions.map(_.eval(EmptyRow).asInstanceOf[UTF8String])
+              ctx.NOT match {
+                case null => LikeAll(e, patterns)
+                case _ => NotLikeAll(e, patterns)
+              }
+            } else {
+              ctx.expression.asScala.map(expression)
+                .map(p => invertIfNotDefined(new Like(e, p))).toSeq.reduceLeft(And)
+            }
+          case _ =>
+            val escapeChar = Option(ctx.escapeChar).map(string).map { str =>
+              if (str.length != 1) {
+                throw new ParseException("Invalid escape string. Escape string must contain only one character.", ctx)
+              }
+              str.charAt(0)
+            }.getOrElse('\\')
+            invertIfNotDefined(Like(e, expression(ctx.pattern), escapeChar))
+        }
+      case HoodieSqlBaseParser.RLIKE =>
+        invertIfNotDefined(RLike(e, expression(ctx.pattern)))
+      case HoodieSqlBaseParser.NULL if ctx.NOT != null =>
+        IsNotNull(e)
+      case HoodieSqlBaseParser.NULL =>
+        IsNull(e)
+      case HoodieSqlBaseParser.TRUE => ctx.NOT match {
+        case null => EqualNullSafe(e, Literal(true))
+        case _ => Not(EqualNullSafe(e, Literal(true)))
+      }
+      case HoodieSqlBaseParser.FALSE => ctx.NOT match {
+        case null => EqualNullSafe(e, Literal(false))
+        case _ => Not(EqualNullSafe(e, Literal(false)))
+      }
+      case HoodieSqlBaseParser.UNKNOWN => ctx.NOT match {
+        case null => IsUnknown(e)
+        case _ => IsNotUnknown(e)
+      }
+      case HoodieSqlBaseParser.DISTINCT if ctx.NOT != null =>
+        EqualNullSafe(e, expression(ctx.right))
+      case HoodieSqlBaseParser.DISTINCT =>
+        Not(EqualNullSafe(e, expression(ctx.right)))
+    }
+  }
+
+  /**
+   * Create a binary arithmetic expression. The following arithmetic operators are supported:
+   * - Multiplication: '*'
+   * - Division: '/'
+   * - Hive Long Division: 'DIV'
+   * - Modulo: '%'
+   * - Addition: '+'
+   * - Subtraction: '-'
+   * - Binary AND: '&'
+   * - Binary XOR
+   * - Binary OR: '|'
+   */
+  override def visitArithmeticBinary(ctx: ArithmeticBinaryContext): Expression = withOrigin(ctx) {
+    val left = expression(ctx.left)
+    val right = expression(ctx.right)
+    ctx.operator.getType match {
+      case HoodieSqlBaseParser.ASTERISK =>
+        Multiply(left, right)
+      case HoodieSqlBaseParser.SLASH =>
+        Divide(left, right)
+      case HoodieSqlBaseParser.PERCENT =>
+        Remainder(left, right)
+      case HoodieSqlBaseParser.DIV =>
+        IntegralDivide(left, right)
+      case HoodieSqlBaseParser.PLUS =>
+        Add(left, right)
+      case HoodieSqlBaseParser.MINUS =>
+        Subtract(left, right)
+      case HoodieSqlBaseParser.CONCAT_PIPE =>
+        Concat(left :: right :: Nil)
+      case HoodieSqlBaseParser.AMPERSAND =>
+        BitwiseAnd(left, right)
+      case HoodieSqlBaseParser.HAT =>
+        BitwiseXor(left, right)
+      case HoodieSqlBaseParser.PIPE =>
+        BitwiseOr(left, right)
+    }
+  }
+
+  /**
+   * Create a unary arithmetic expression. The following arithmetic operators are supported:
+   * - Plus: '+'
+   * - Minus: '-'
+   * - Bitwise Not: '~'
+   */
+  override def visitArithmeticUnary(ctx: ArithmeticUnaryContext): Expression = withOrigin(ctx) {
+    val value = expression(ctx.valueExpression)
+    ctx.operator.getType match {
+      case HoodieSqlBaseParser.PLUS =>
+        UnaryPositive(value)
+      case HoodieSqlBaseParser.MINUS =>
+        UnaryMinus(value)
+      case HoodieSqlBaseParser.TILDE =>
+        BitwiseNot(value)
+    }
+  }
+
+  override def visitCurrentLike(ctx: CurrentLikeContext): Expression = withOrigin(ctx) {
+    if (conf.ansiEnabled) {
+      ctx.name.getType match {
+        case HoodieSqlBaseParser.CURRENT_DATE =>
+          CurrentDate()
+        case HoodieSqlBaseParser.CURRENT_TIMESTAMP =>
+          CurrentTimestamp()
+        case HoodieSqlBaseParser.CURRENT_USER =>
+          CurrentUser()
+      }
+    } else {
+      // If the parser is not in ansi mode, we should return `UnresolvedAttribute`, in case there
+      // are columns named `CURRENT_DATE` or `CURRENT_TIMESTAMP`.
+      UnresolvedAttribute.quoted(ctx.name.getText)
+    }
+  }
+
+  /**
+   * Create a [[Cast]] expression.
+   */
+  override def visitCast(ctx: CastContext): Expression = withOrigin(ctx) {
+    val rawDataType = typedVisit[DataType](ctx.dataType())
+    val dataType = CharVarcharUtils.replaceCharVarcharWithStringForCast(rawDataType)
+    val cast = ctx.name.getType match {
+      case HoodieSqlBaseParser.CAST =>
+        Cast(expression(ctx.expression), dataType)
+
+      case HoodieSqlBaseParser.TRY_CAST =>
+        TryCast(expression(ctx.expression), dataType)
+    }
+    cast.setTagValue(Cast.USER_SPECIFIED_CAST, true)
+    cast
+  }
+
+  /**
+   * Create a [[CreateStruct]] expression.
+   */
+  override def visitStruct(ctx: StructContext): Expression = withOrigin(ctx) {
+    CreateStruct.create(ctx.argument.asScala.map(expression).toSeq)
+  }
+
+  /**
+   * Create a [[First]] expression.
+   */
+  override def visitFirst(ctx: FirstContext): Expression = withOrigin(ctx) {
+    val ignoreNullsExpr = ctx.IGNORE != null
+    First(expression(ctx.expression), ignoreNullsExpr).toAggregateExpression()
+  }
+
+  /**
+   * Create a [[Last]] expression.
+   */
+  override def visitLast(ctx: LastContext): Expression = withOrigin(ctx) {
+    val ignoreNullsExpr = ctx.IGNORE != null
+    Last(expression(ctx.expression), ignoreNullsExpr).toAggregateExpression()
+  }
+
+  /**
+   * Create a Position expression.
+   */
+  override def visitPosition(ctx: PositionContext): Expression = withOrigin(ctx) {
+    new StringLocate(expression(ctx.substr), expression(ctx.str))
+  }
+
+  /**
+   * Create a Extract expression.
+   */
+  override def visitExtract(ctx: ExtractContext): Expression = withOrigin(ctx) {
+    val arguments = Seq(Literal(ctx.field.getText), expression(ctx.source))
+    UnresolvedFunction("extract", arguments, isDistinct = false)
+  }
+
+  /**
+   * Create a Substring/Substr expression.
+   */
+  override def visitSubstring(ctx: SubstringContext): Expression = withOrigin(ctx) {
+    if (ctx.len != null) {
+      Substring(expression(ctx.str), expression(ctx.pos), expression(ctx.len))
+    } else {
+      new Substring(expression(ctx.str), expression(ctx.pos))
+    }
+  }
+
+  /**
+   * Create a Trim expression.
+   */
+  override def visitTrim(ctx: TrimContext): Expression = withOrigin(ctx) {
+    val srcStr = expression(ctx.srcStr)
+    val trimStr = Option(ctx.trimStr).map(expression)
+    Option(ctx.trimOption).map(_.getType).getOrElse(HoodieSqlBaseParser.BOTH) match {
+      case HoodieSqlBaseParser.BOTH =>
+        StringTrim(srcStr, trimStr)
+      case HoodieSqlBaseParser.LEADING =>
+        StringTrimLeft(srcStr, trimStr)
+      case HoodieSqlBaseParser.TRAILING =>
+        StringTrimRight(srcStr, trimStr)
+      case other =>
+        throw new ParseException("Function trim doesn't support with " +
+          s"type $other. Please use BOTH, LEADING or TRAILING as trim type", ctx)
+    }
+  }
+
+  /**
+   * Create a Overlay expression.
+   */
+  override def visitOverlay(ctx: OverlayContext): Expression = withOrigin(ctx) {
+    val input = expression(ctx.input)
+    val replace = expression(ctx.replace)
+    val position = expression(ctx.position)
+    val lengthOpt = Option(ctx.length).map(expression)
+    lengthOpt match {
+      case Some(length) => Overlay(input, replace, position, length)
+      case None => new Overlay(input, replace, position)
+    }
+  }
+
+  /**
+   * Create a (windowed) Function expression.
+   */
+  override def visitFunctionCall(ctx: FunctionCallContext): Expression = withOrigin(ctx) {
+    // Create the function call.
+    val name = ctx.functionName.getText
+    val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null)
+    // Call `toSeq`, otherwise `ctx.argument.asScala.map(expression)` is `Buffer` in Scala 2.13
+    val arguments = ctx.argument.asScala.map(expression).toSeq match {
+      case Seq(UnresolvedStar(None))
+        if name.toLowerCase(Locale.ROOT) == "count" && !isDistinct =>
+        // Transform COUNT(*) into COUNT(1).
+        Seq(Literal(1))
+      case expressions =>
+        expressions
+    }
+    val filter = Option(ctx.where).map(expression(_))
+    val ignoreNulls =
+      Option(ctx.nullsOption).map(_.getType == HoodieSqlBaseParser.IGNORE).getOrElse(false)
+    val function = UnresolvedFunction(
+      getFunctionMultiparts(ctx.functionName), arguments, isDistinct, filter, ignoreNulls)
+
+    // Check if the function is evaluated in a windowed context.
+    ctx.windowSpec match {
+      case spec: WindowRefContext =>
+        UnresolvedWindowExpression(function, visitWindowRef(spec))
+      case spec: WindowDefContext =>
+        WindowExpression(function, visitWindowDef(spec))
+      case _ => function
+    }
+  }
+
+  /**
+   * Create a function database (optional) and name pair.
+   */
+  protected def visitFunctionName(ctx: QualifiedNameContext): FunctionIdentifier = {
+    visitFunctionName(ctx, ctx.identifier().asScala.map(_.getText).toSeq)
+  }
+
+  /**
+   * Create a function database (optional) and name pair.
+   */
+  private def visitFunctionName(ctx: ParserRuleContext, texts: Seq[String]): FunctionIdentifier = {
+    texts match {
+      case Seq(db, fn) => FunctionIdentifier(fn, Option(db))
+      case Seq(fn) => FunctionIdentifier(fn, None)
+      case other =>
+        throw new ParseException(s"Unsupported function name '${texts.mkString(".")}'", ctx)
+    }
+  }
+
+  /**
+   * Get a function identifier consist by database (optional) and name.
+   */
+  protected def getFunctionIdentifier(ctx: FunctionNameContext): FunctionIdentifier = {
+    if (ctx.qualifiedName != null) {
+      visitFunctionName(ctx.qualifiedName)
+    } else {
+      FunctionIdentifier(ctx.getText, None)
+    }
+  }
+
+  protected def getFunctionMultiparts(ctx: FunctionNameContext): Seq[String] = {
+    if (ctx.qualifiedName != null) {
+      ctx.qualifiedName().identifier().asScala.map(_.getText).toSeq
+    } else {
+      Seq(ctx.getText)
+    }
+  }
+
+  /**
+   * Create an [[LambdaFunction]].
+   */
+  override def visitLambda(ctx: LambdaContext): Expression = withOrigin(ctx) {
+    val arguments = ctx.identifier().asScala.map { name =>
+      UnresolvedNamedLambdaVariable(UnresolvedAttribute.quoted(name.getText).nameParts)
+    }
+    val function = expression(ctx.expression).transformUp {
+      case a: UnresolvedAttribute => UnresolvedNamedLambdaVariable(a.nameParts)
+    }
+    LambdaFunction(function, arguments.toSeq)
+  }
+
+  /**
+   * Create a reference to a window frame, i.e. [[WindowSpecReference]].
+   */
+  override def visitWindowRef(ctx: WindowRefContext): WindowSpecReference = withOrigin(ctx) {
+    WindowSpecReference(ctx.name.getText)
+  }
+
+  /**
+   * Create a window definition, i.e. [[WindowSpecDefinition]].
+   */
+  override def visitWindowDef(ctx: WindowDefContext): WindowSpecDefinition = withOrigin(ctx) {
+    // CLUSTER BY ... | PARTITION BY ... ORDER BY ...
+    val partition = ctx.partition.asScala.map(expression)
+    val order = ctx.sortItem.asScala.map(visitSortItem)
+
+    // RANGE/ROWS BETWEEN ...
+    val frameSpecOption = Option(ctx.windowFrame).map { frame =>
+      val frameType = frame.frameType.getType match {
+        case HoodieSqlBaseParser.RANGE => RangeFrame
+        case HoodieSqlBaseParser.ROWS => RowFrame
+      }
+
+      SpecifiedWindowFrame(
+        frameType,
+        visitFrameBound(frame.start),
+        Option(frame.end).map(visitFrameBound).getOrElse(CurrentRow))
+    }
+
+    WindowSpecDefinition(
+      partition.toSeq,
+      order.toSeq,
+      frameSpecOption.getOrElse(UnspecifiedFrame))
+  }
+
+  /**
+   * Create or resolve a frame boundary expressions.
+   */
+  override def visitFrameBound(ctx: FrameBoundContext): Expression = withOrigin(ctx) {
+    def value: Expression = {
+      val e = expression(ctx.expression)
+      validate(e.resolved && e.foldable, "Frame bound value must be a literal.", ctx)
+      e
+    }
+
+    ctx.boundType.getType match {
+      case HoodieSqlBaseParser.PRECEDING if ctx.UNBOUNDED != null =>
+        UnboundedPreceding
+      case HoodieSqlBaseParser.PRECEDING =>
+        UnaryMinus(value)
+      case HoodieSqlBaseParser.CURRENT =>
+        CurrentRow
+      case HoodieSqlBaseParser.FOLLOWING if ctx.UNBOUNDED != null =>
+        UnboundedFollowing
+      case HoodieSqlBaseParser.FOLLOWING =>
+        value
+    }
+  }
+
+  /**
+   * Create a [[CreateStruct]] expression.
+   */
+  override def visitRowConstructor(ctx: RowConstructorContext): Expression = withOrigin(ctx) {
+    CreateStruct(ctx.namedExpression().asScala.map(expression).toSeq)
+  }
+
+  /**
+   * Create a [[ScalarSubquery]] expression.
+   */
+  override def visitSubqueryExpression(
+                                        ctx: SubqueryExpressionContext): Expression = withOrigin(ctx) {
+    ScalarSubquery(plan(ctx.query))
+  }
+
+  /**
+   * Create a value based [[CaseWhen]] expression. This has the following SQL form:
+   * {{{
+   *   CASE [expression]
+   *    WHEN [value] THEN [expression]
+   *    ...
+   *    ELSE [expression]
+   *   END
+   * }}}
+   */
+  override def visitSimpleCase(ctx: SimpleCaseContext): Expression = withOrigin(ctx) {
+    val e = expression(ctx.value)
+    val branches = ctx.whenClause.asScala.map { wCtx =>
+      (EqualTo(e, expression(wCtx.condition)), expression(wCtx.result))
+    }
+    CaseWhen(branches.toSeq, Option(ctx.elseExpression).map(expression))
+  }
+
+  /**
+   * Create a condition based [[CaseWhen]] expression. This has the following SQL syntax:
+   * {{{
+   *   CASE
+   *    WHEN [predicate] THEN [expression]
+   *    ...
+   *    ELSE [expression]
+   *   END
+   * }}}
+   *
+   * @param ctx the parse tree
+   * */
+  override def visitSearchedCase(ctx: SearchedCaseContext): Expression = withOrigin(ctx) {
+    val branches = ctx.whenClause.asScala.map { wCtx =>
+      (expression(wCtx.condition), expression(wCtx.result))
+    }
+    CaseWhen(branches.toSeq, Option(ctx.elseExpression).map(expression))
+  }
+
+  /**
+   * Currently only regex in expressions of SELECT statements are supported; in other
+   * places, e.g., where `(a)?+.+` = 2, regex are not meaningful.
+   */
+  private def canApplyRegex(ctx: ParserRuleContext): Boolean = withOrigin(ctx) {
+    var parent = ctx.getParent
+    var rtn = false
+    while (parent != null) {
+      if (parent.isInstanceOf[NamedExpressionContext]) {
+        rtn = true
+      }
+      parent = parent.getParent
+    }
+    rtn
+  }
+
+  /**
+   * Create a dereference expression. The return type depends on the type of the parent.
+   * If the parent is an [[UnresolvedAttribute]], it can be a [[UnresolvedAttribute]] or
+   * a [[UnresolvedRegex]] for regex quoted in ``; if the parent is some other expression,
+   * it can be [[UnresolvedExtractValue]].
+   */
+  override def visitDereference(ctx: DereferenceContext): Expression = withOrigin(ctx) {
+    val attr = ctx.fieldName.getText
+    expression(ctx.base) match {
+      case unresolved_attr@UnresolvedAttribute(nameParts) =>
+        ctx.fieldName.getStart.getText match {
+          case escapedIdentifier(columnNameRegex)
+            if conf.supportQuotedRegexColumnName && canApplyRegex(ctx) =>
+            UnresolvedRegex(columnNameRegex, Some(unresolved_attr.name),
+              conf.caseSensitiveAnalysis)
+          case _ =>
+            UnresolvedAttribute(nameParts :+ attr)
+        }
+      case e =>
+        UnresolvedExtractValue(e, Literal(attr))
+    }
+  }
+
+  /**
+   * Create an [[UnresolvedAttribute]] expression or a [[UnresolvedRegex]] if it is a regex
+   * quoted in ``
+   */
+  override def visitColumnReference(ctx: ColumnReferenceContext): Expression = withOrigin(ctx) {
+    ctx.getStart.getText match {
+      case escapedIdentifier(columnNameRegex)
+        if conf.supportQuotedRegexColumnName && canApplyRegex(ctx) =>
+        UnresolvedRegex(columnNameRegex, None, conf.caseSensitiveAnalysis)
+      case _ =>
+        UnresolvedAttribute.quoted(ctx.getText)
+    }
+
+  }
+
+  /**
+   * Create an [[UnresolvedExtractValue]] expression, this is used for subscript access to an array.
+   */
+  override def visitSubscript(ctx: SubscriptContext): Expression = withOrigin(ctx) {
+    UnresolvedExtractValue(expression(ctx.value), expression(ctx.index))
+  }
+
+  /**
+   * Create an expression for an expression between parentheses. This is need because the ANTLR
+   * visitor cannot automatically convert the nested context into an expression.
+   */
+  override def visitParenthesizedExpression(
+                                             ctx: ParenthesizedExpressionContext): Expression = withOrigin(ctx) {
+    expression(ctx.expression)
+  }
+
+  /**
+   * Create a [[SortOrder]] expression.
+   */
+  override def visitSortItem(ctx: SortItemContext): SortOrder = withOrigin(ctx) {
+    val direction = if (ctx.DESC != null) {
+      Descending
+    } else {
+      Ascending
+    }
+    val nullOrdering = if (ctx.FIRST != null) {
+      NullsFirst
+    } else if (ctx.LAST != null) {
+      NullsLast
+    } else {
+      direction.defaultNullOrdering
+    }
+    SortOrder(expression(ctx.expression), direction, nullOrdering, Seq.empty)
+  }
+
+  /**
+   * Create a typed Literal expression. A typed literal has the following SQL syntax:
+   * {{{
+   *   [TYPE] '[VALUE]'
+   * }}}
+   * Currently Date, Timestamp, Interval and Binary typed literals are supported.
+   */
+  override def visitTypeConstructor(ctx: TypeConstructorContext): Literal = withOrigin(ctx) {
+    val value = string(ctx.STRING)
+    val valueType = ctx.identifier.getText.toUpperCase(Locale.ROOT)
+
+    def toLiteral[T](f: UTF8String => Option[T], t: DataType): Literal = {
+      f(UTF8String.fromString(value)).map(Literal(_, t)).getOrElse {
+        throw new ParseException(s"Cannot parse the $valueType value: $value", ctx)
+      }
+    }
+
+    def constructTimestampLTZLiteral(value: String): Literal = {
+      val zoneId = getZoneId(conf.sessionLocalTimeZone)
+      val specialTs = convertSpecialTimestamp(value, zoneId).map(Literal(_, TimestampType))
+      specialTs.getOrElse(toLiteral(stringToTimestamp(_, zoneId), TimestampType))
+    }
+
+    try {
+      valueType match {
+        case "DATE" =>
+          val zoneId = getZoneId(conf.sessionLocalTimeZone)
+          val specialDate = convertSpecialDate(value, zoneId).map(Literal(_, DateType))
+          specialDate.getOrElse(toLiteral(stringToDate, DateType))
+        // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
+        case "TIMESTAMP_NTZ" if isTesting =>
+          convertSpecialTimestampNTZ(value, getZoneId(conf.sessionLocalTimeZone))
+            .map(Literal(_, TimestampNTZType))
+            .getOrElse(toLiteral(stringToTimestampWithoutTimeZone, TimestampNTZType))
+        case "TIMESTAMP_LTZ" if isTesting =>
+          constructTimestampLTZLiteral(value)
+        case "TIMESTAMP" =>
+          SQLConf.get.timestampType match {
+            case TimestampNTZType =>
+              convertSpecialTimestampNTZ(value, getZoneId(conf.sessionLocalTimeZone))
+                .map(Literal(_, TimestampNTZType))
+                .getOrElse {
+                  val containsTimeZonePart =
+                    DateTimeUtils.parseTimestampString(UTF8String.fromString(value))._2.isDefined
+                  // If the input string contains time zone part, return a timestamp with local time
+                  // zone literal.
+                  if (containsTimeZonePart) {
+                    constructTimestampLTZLiteral(value)
+                  } else {
+                    toLiteral(stringToTimestampWithoutTimeZone, TimestampNTZType)
+                  }
+                }
+
+            case TimestampType =>
+              constructTimestampLTZLiteral(value)
+          }
+
+        case "INTERVAL" =>
+          val interval = try {
+            IntervalUtils.stringToInterval(UTF8String.fromString(value))
+          } catch {
+            case e: IllegalArgumentException =>
+              val ex = new ParseException(s"Cannot parse the INTERVAL value: $value", ctx)
+              ex.setStackTrace(e.getStackTrace)
+              throw ex
+          }
+          if (!conf.legacyIntervalEnabled) {
+            val units = value
+              .split("\\s")
+              .map(_.toLowerCase(Locale.ROOT).stripSuffix("s"))
+              .filter(s => s != "interval" && s.matches("[a-z]+"))
+            constructMultiUnitsIntervalLiteral(ctx, interval, units)
+          } else {
+            Literal(interval, CalendarIntervalType)
+          }
+        case "X" =>
+          val padding = if (value.length % 2 != 0) "0" else ""
+          Literal(DatatypeConverter.parseHexBinary(padding + value))
+        case other =>
+          throw new ParseException(s"Literals of type '$other' are currently not supported.", ctx)
+      }
+    } catch {
+      case e: IllegalArgumentException =>
+        val message = Option(e.getMessage).getOrElse(s"Exception parsing $valueType")
+        throw new ParseException(message, ctx)
+    }
+  }
+
+  /**
+   * Create a NULL literal expression.
+   */
+  override def visitNullLiteral(ctx: NullLiteralContext): Literal = withOrigin(ctx) {
+    Literal(null)
+  }
+
+  /**
+   * Create a Boolean literal expression.
+   */
+  override def visitBooleanLiteral(ctx: BooleanLiteralContext): Literal = withOrigin(ctx) {
+    if (ctx.getText.toBoolean) {
+      Literal.TrueLiteral
+    } else {
+      Literal.FalseLiteral
+    }
+  }
+
+  /**
+   * Create an integral literal expression. The code selects the most narrow integral type
+   * possible, either a BigDecimal, a Long or an Integer is returned.
+   */
+  override def visitIntegerLiteral(ctx: IntegerLiteralContext): Literal = withOrigin(ctx) {
+    BigDecimal(ctx.getText) match {
+      case v if v.isValidInt =>
+        Literal(v.intValue)
+      case v if v.isValidLong =>
+        Literal(v.longValue)
+      case v => Literal(v.underlying())
+    }
+  }
+
+  /**
+   * Create a decimal literal for a regular decimal number.
+   */
+  override def visitDecimalLiteral(ctx: DecimalLiteralContext): Literal = withOrigin(ctx) {
+    Literal(BigDecimal(ctx.getText).underlying())
+  }
+
+  /**
+   * Create a decimal literal for a regular decimal number or a scientific decimal number.
+   */
+  override def visitLegacyDecimalLiteral(
+                                          ctx: LegacyDecimalLiteralContext): Literal = withOrigin(ctx) {
+    Literal(BigDecimal(ctx.getText).underlying())
+  }
+
+  /**
+   * Create a double literal for number with an exponent, e.g. 1E-30
+   */
+  override def visitExponentLiteral(ctx: ExponentLiteralContext): Literal = {
+    numericLiteral(ctx, ctx.getText, /* exponent values don't have a suffix */
+      Double.MinValue, Double.MaxValue, DoubleType.simpleString)(_.toDouble)
+  }
+
+  /** Create a numeric literal expression. */
+  private def numericLiteral(
+                              ctx: NumberContext,
+                              rawStrippedQualifier: String,
+                              minValue: BigDecimal,
+                              maxValue: BigDecimal,
+                              typeName: String)(converter: String => Any): Literal = withOrigin(ctx) {
+    try {
+      val rawBigDecimal = BigDecimal(rawStrippedQualifier)
+      if (rawBigDecimal < minValue || rawBigDecimal > maxValue) {
+        throw new ParseException(s"Numeric literal $rawStrippedQualifier does not " +
+          s"fit in range [$minValue, $maxValue] for type $typeName", ctx)
+      }
+      Literal(converter(rawStrippedQualifier))
+    } catch {
+      case e: NumberFormatException =>
+        throw new ParseException(e.getMessage, ctx)
+    }
+  }
+
+  /**
+   * Create a Byte Literal expression.
+   */
+  override def visitTinyIntLiteral(ctx: TinyIntLiteralContext): Literal = {
+    val rawStrippedQualifier = ctx.getText.substring(0, ctx.getText.length - 1)
+    numericLiteral(ctx, rawStrippedQualifier,
+      Byte.MinValue, Byte.MaxValue, ByteType.simpleString)(_.toByte)
+  }
+
+  /**
+   * Create a Short Literal expression.
+   */
+  override def visitSmallIntLiteral(ctx: SmallIntLiteralContext): Literal = {
+    val rawStrippedQualifier = ctx.getText.substring(0, ctx.getText.length - 1)
+    numericLiteral(ctx, rawStrippedQualifier,
+      Short.MinValue, Short.MaxValue, ShortType.simpleString)(_.toShort)
+  }
+
+  /**
+   * Create a Long Literal expression.
+   */
+  override def visitBigIntLiteral(ctx: BigIntLiteralContext): Literal = {
+    val rawStrippedQualifier = ctx.getText.substring(0, ctx.getText.length - 1)
+    numericLiteral(ctx, rawStrippedQualifier,
+      Long.MinValue, Long.MaxValue, LongType.simpleString)(_.toLong)
+  }
+
+  /**
+   * Create a Float Literal expression.
+   */
+  override def visitFloatLiteral(ctx: FloatLiteralContext): Literal = {
+    val rawStrippedQualifier = ctx.getText.substring(0, ctx.getText.length - 1)
+    numericLiteral(ctx, rawStrippedQualifier,
+      Float.MinValue, Float.MaxValue, FloatType.simpleString)(_.toFloat)
+  }
+
+  /**
+   * Create a Double Literal expression.
+   */
+  override def visitDoubleLiteral(ctx: DoubleLiteralContext): Literal = {
+    val rawStrippedQualifier = ctx.getText.substring(0, ctx.getText.length - 1)
+    numericLiteral(ctx, rawStrippedQualifier,
+      Double.MinValue, Double.MaxValue, DoubleType.simpleString)(_.toDouble)
+  }
+
+  /**
+   * Create a BigDecimal Literal expression.
+   */
+  override def visitBigDecimalLiteral(ctx: BigDecimalLiteralContext): Literal = {
+    val raw = ctx.getText.substring(0, ctx.getText.length - 2)
+    try {
+      Literal(BigDecimal(raw).underlying())
+    } catch {
+      case e: AnalysisException =>
+        throw new ParseException(e.message, ctx)
+    }
+  }
+
+  /**
+   * Create a String literal expression.
+   */
+  override def visitStringLiteral(ctx: StringLiteralContext): Literal = withOrigin(ctx) {
+    Literal(createString(ctx))
+  }
+
+  /**
+   * Create a String from a string literal context. This supports multiple consecutive string
+   * literals, these are concatenated, for example this expression "'hello' 'world'" will be
+   * converted into "helloworld".
+   *
+   * Special characters can be escaped by using Hive/C-style escaping.
+   */
+  private def createString(ctx: StringLiteralContext): String = {
+    if (conf.escapedStringLiterals) {
+      ctx.STRING().asScala.map(stringWithoutUnescape).mkString
+    } else {
+      ctx.STRING().asScala.map(string).mkString
+    }
+  }
+
+  /**
+   * Create an [[UnresolvedRelation]] from a multi-part identifier context.
+   */
+  private def createUnresolvedRelation(
+                                        ctx: MultipartIdentifierContext): UnresolvedRelation = withOrigin(ctx) {
+    UnresolvedRelation(visitMultipartIdentifier(ctx))
+  }
+
+  /**
+   * Construct an [[Literal]] from [[CalendarInterval]] and
+   * units represented as a [[Seq]] of [[String]].
+   */
+  private def constructMultiUnitsIntervalLiteral(
+                                                  ctx: ParserRuleContext,
+                                                  calendarInterval: CalendarInterval,
+                                                  units: Seq[String]): Literal = {
+    var yearMonthFields = Set.empty[Byte]
+    var dayTimeFields = Set.empty[Byte]
+    for (unit <- units) {
+      if (YearMonthIntervalType.stringToField.contains(unit)) {
+        yearMonthFields += YearMonthIntervalType.stringToField(unit)
+      } else if (DayTimeIntervalType.stringToField.contains(unit)) {
+        dayTimeFields += DayTimeIntervalType.stringToField(unit)
+      } else if (unit == "week") {
+        dayTimeFields += DayTimeIntervalType.DAY
+      } else {
+        assert(unit == "millisecond" || unit == "microsecond")
+        dayTimeFields += DayTimeIntervalType.SECOND
+      }
+    }
+    if (yearMonthFields.nonEmpty) {
+      if (dayTimeFields.nonEmpty) {
+        val literalStr = source(ctx)
+        throw new ParseException(s"Cannot mix year-month and day-time fields: $literalStr", ctx)
+      }
+      Literal(
+        calendarInterval.months,
+        YearMonthIntervalType(yearMonthFields.min, yearMonthFields.max)
+      )
+    } else {
+      Literal(
+        IntervalUtils.getDuration(calendarInterval, TimeUnit.MICROSECONDS),
+        DayTimeIntervalType(dayTimeFields.min, dayTimeFields.max))
+    }
+  }
+
+  /**
+   * Create a [[CalendarInterval]] or ANSI interval literal expression.
+   * Two syntaxes are supported:
+   * - multiple unit value pairs, for instance: interval 2 months 2 days.
+   * - from-to unit, for instance: interval '1-2' year to month.
+   */
+  override def visitInterval(ctx: IntervalContext): Literal = withOrigin(ctx) {
+    val calendarInterval = parseIntervalLiteral(ctx)
+    if (ctx.errorCapturingUnitToUnitInterval != null && !conf.legacyIntervalEnabled) {
+      // Check the `to` unit to distinguish year-month and day-time intervals because
+      // `CalendarInterval` doesn't have enough info. For instance, new CalendarInterval(0, 0, 0)
+      // can be derived from INTERVAL '0-0' YEAR TO MONTH as well as from
+      // INTERVAL '0 00:00:00' DAY TO SECOND.
+      val fromUnit =
+      ctx.errorCapturingUnitToUnitInterval.body.from.getText.toLowerCase(Locale.ROOT)
+      val toUnit = ctx.errorCapturingUnitToUnitInterval.body.to.getText.toLowerCase(Locale.ROOT)
+      if (toUnit == "month") {
+        assert(calendarInterval.days == 0 && calendarInterval.microseconds == 0)
+        val start = YearMonthIntervalType.stringToField(fromUnit)
+        Literal(calendarInterval.months, YearMonthIntervalType(start, YearMonthIntervalType.MONTH))
+      } else {
+        assert(calendarInterval.months == 0)
+        val micros = IntervalUtils.getDuration(calendarInterval, TimeUnit.MICROSECONDS)
+        val start = DayTimeIntervalType.stringToField(fromUnit)
+        val end = DayTimeIntervalType.stringToField(toUnit)
+        Literal(micros, DayTimeIntervalType(start, end))
+      }
+    } else if (ctx.errorCapturingMultiUnitsInterval != null && !conf.legacyIntervalEnabled) {
+      val units =
+        ctx.errorCapturingMultiUnitsInterval.body.unit.asScala.map(
+          _.getText.toLowerCase(Locale.ROOT).stripSuffix("s")).toSeq
+      constructMultiUnitsIntervalLiteral(ctx, calendarInterval, units)
+    } else {
+      Literal(calendarInterval, CalendarIntervalType)
+    }
+  }
+
+  /**
+   * Create a [[CalendarInterval]] object
+   */
+  protected def parseIntervalLiteral(ctx: IntervalContext): CalendarInterval = withOrigin(ctx) {
+    if (ctx.errorCapturingMultiUnitsInterval != null) {
+      val innerCtx = ctx.errorCapturingMultiUnitsInterval
+      if (innerCtx.unitToUnitInterval != null) {
+        throw new ParseException("Can only have a single from-to unit in the interval literal syntax", innerCtx.unitToUnitInterval)
+      }
+      visitMultiUnitsInterval(innerCtx.multiUnitsInterval)
+    } else if (ctx.errorCapturingUnitToUnitInterval != null) {
+      val innerCtx = ctx.errorCapturingUnitToUnitInterval
+      if (innerCtx.error1 != null || innerCtx.error2 != null) {
+        val errorCtx = if (innerCtx.error1 != null) innerCtx.error1 else innerCtx.error2
+        throw new ParseException("Can only have a single from-to unit in the interval literal syntax", errorCtx)
+      }
+      visitUnitToUnitInterval(innerCtx.body)
+    } else {
+      throw new ParseException("at least one time unit should be given for interval literal", ctx)
+    }
+  }
+
+  /**
+   * Creates a [[CalendarInterval]] with multiple unit value pairs, e.g. 1 YEAR 2 DAYS.
+   */
+  override def visitMultiUnitsInterval(ctx: MultiUnitsIntervalContext): CalendarInterval = {
+    withOrigin(ctx) {
+      val units = ctx.unit.asScala
+      val values = ctx.intervalValue().asScala
+      try {
+        assert(units.length == values.length)
+        val kvs = units.indices.map { i =>
+          val u = units(i).getText
+          val v = if (values(i).STRING() != null) {
+            val value = string(values(i).STRING())
+            // SPARK-32840: For invalid cases, e.g. INTERVAL '1 day 2' hour,
+            // INTERVAL 'interval 1' day, we need to check ahead before they are concatenated with
+            // units and become valid ones, e.g. '1 day 2 hour'.
+            // Ideally, we only ensure the value parts don't contain any units here.
+            if (value.exists(Character.isLetter)) {
+              throw new ParseException("Can only use numbers in the interval value part for" +
+                s" multiple unit value pairs interval form, but got invalid value: $value", ctx)
+            }
+            if (values(i).MINUS() == null) {
+              value
+            } else {
+              value.startsWith("-") match {
+                case true => value.replaceFirst("-", "")
+                case false => s"-$value"
+              }
+            }
+          } else {
+            values(i).getText
+          }
+          UTF8String.fromString(" " + v + " " + u)
+        }
+        IntervalUtils.stringToInterval(UTF8String.concat(kvs: _*))
+      } catch {
+        case i: IllegalArgumentException =>
+          val e = new ParseException(i.getMessage, ctx)
+          e.setStackTrace(i.getStackTrace)
+          throw e
+      }
+    }
+  }
+
+  /**
+   * Creates a [[CalendarInterval]] with from-to unit, e.g. '2-1' YEAR TO MONTH.
+   */
+  override def visitUnitToUnitInterval(ctx: UnitToUnitIntervalContext): CalendarInterval = {
+    withOrigin(ctx) {
+      val value = Option(ctx.intervalValue.STRING).map(string).map { interval =>
+        if (ctx.intervalValue().MINUS() == null) {
+          interval
+        } else {
+          interval.startsWith("-") match {
+            case true => interval.replaceFirst("-", "")
+            case false => s"-$interval"
+          }
+        }
+      }.getOrElse {
+        throw new ParseException("The value of from-to unit must be a string", ctx.intervalValue)
+      }
+      try {
+        val from = ctx.from.getText.toLowerCase(Locale.ROOT)
+        val to = ctx.to.getText.toLowerCase(Locale.ROOT)
+        (from, to) match {
+          case ("year", "month") =>
+            IntervalUtils.fromYearMonthString(value)
+          case ("day", "hour") | ("day", "minute") | ("day", "second") | ("hour", "minute") |
+               ("hour", "second") | ("minute", "second") =>
+            IntervalUtils.fromDayTimeString(value,
+              DayTimeIntervalType.stringToField(from), DayTimeIntervalType.stringToField(to))
+          case _ =>
+            throw new ParseException(s"Intervals FROM $from TO $to are not supported.", ctx)
+        }
+      } catch {
+        // Handle Exceptions thrown by CalendarInterval
+        case e: IllegalArgumentException =>
+          val pe = new ParseException(e.getMessage, ctx)
+          pe.setStackTrace(e.getStackTrace)
+          throw pe
+      }
+    }
+  }
+
+  /* ********************************************************************************************
+   * DataType parsing
+   * ******************************************************************************************** */
+
+  /**
+   * Resolve/create a primitive type.
+   */
+  override def visitPrimitiveDataType(ctx: PrimitiveDataTypeContext): DataType = withOrigin(ctx) {
+    val dataType = ctx.identifier.getText.toLowerCase(Locale.ROOT)
+    (dataType, ctx.INTEGER_VALUE().asScala.toList) match {
+      case ("boolean", Nil) => BooleanType
+      case ("tinyint" | "byte", Nil) => ByteType
+      case ("smallint" | "short", Nil) => ShortType
+      case ("int" | "integer", Nil) => IntegerType
+      case ("bigint" | "long", Nil) => LongType
+      case ("float" | "real", Nil) => FloatType
+      case ("double", Nil) => DoubleType
+      case ("date", Nil) => DateType
+      case ("timestamp", Nil) => SQLConf.get.timestampType
+      // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
+      case ("timestamp_ntz", Nil) if isTesting => TimestampNTZType
+      case ("timestamp_ltz", Nil) if isTesting => TimestampType
+      case ("string", Nil) => StringType
+      case ("character" | "char", length :: Nil) => CharType(length.getText.toInt)
+      case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
+      case ("binary", Nil) => BinaryType
+      case ("decimal" | "dec" | "numeric", Nil) => DecimalType.USER_DEFAULT
+      case ("decimal" | "dec" | "numeric", precision :: Nil) =>
+        DecimalType(precision.getText.toInt, 0)
+      case ("decimal" | "dec" | "numeric", precision :: scale :: Nil) =>
+        DecimalType(precision.getText.toInt, scale.getText.toInt)
+      case ("void", Nil) => NullType
+      case ("interval", Nil) => CalendarIntervalType
+      case (dt, params) =>
+        val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt
+        throw new ParseException(s"DataType $dtStr is not supported.", ctx)
+    }
+  }
+
+  override def visitYearMonthIntervalDataType(ctx: YearMonthIntervalDataTypeContext): DataType = {
+    val startStr = ctx.from.getText.toLowerCase(Locale.ROOT)
+    val start = YearMonthIntervalType.stringToField(startStr)
+    if (ctx.to != null) {
+      val endStr = ctx.to.getText.toLowerCase(Locale.ROOT)
+      val end = YearMonthIntervalType.stringToField(endStr)
+      if (end <= start) {
+        throw new ParseException(s"Intervals FROM $startStr TO $endStr are not supported.", ctx)
+      }
+      YearMonthIntervalType(start, end)
+    } else {
+      YearMonthIntervalType(start)
+    }
+  }
+
+  override def visitDayTimeIntervalDataType(ctx: DayTimeIntervalDataTypeContext): DataType = {
+    val startStr = ctx.from.getText.toLowerCase(Locale.ROOT)
+    val start = DayTimeIntervalType.stringToField(startStr)
+    if (ctx.to != null) {
+      val endStr = ctx.to.getText.toLowerCase(Locale.ROOT)
+      val end = DayTimeIntervalType.stringToField(endStr)
+      if (end <= start) {
+        throw new ParseException(s"Intervals FROM $startStr TO $endStr are not supported.", ctx)
+      }
+      DayTimeIntervalType(start, end)
+    } else {
+      DayTimeIntervalType(start)
+    }
+  }
+
+  /**
+   * Create a complex DataType. Arrays, Maps and Structures are supported.
+   */
+  override def visitComplexDataType(ctx: ComplexDataTypeContext): DataType = withOrigin(ctx) {
+    ctx.complex.getType match {
+      case HoodieSqlBaseParser.ARRAY =>
+        ArrayType(typedVisit(ctx.dataType(0)))
+      case HoodieSqlBaseParser.MAP =>
+        MapType(typedVisit(ctx.dataType(0)), typedVisit(ctx.dataType(1)))
+      case HoodieSqlBaseParser.STRUCT =>
+        StructType(Option(ctx.complexColTypeList).toSeq.flatMap(visitComplexColTypeList))
+    }
+  }
+
+  /**
+   * Create top level table schema.
+   */
+  protected def createSchema(ctx: ColTypeListContext): StructType = {
+    StructType(Option(ctx).toSeq.flatMap(visitColTypeList))
+  }
+
+  /**
+   * Create a [[StructType]] from a number of column definitions.
+   */
+  override def visitColTypeList(ctx: ColTypeListContext): Seq[StructField] = withOrigin(ctx) {
+    ctx.colType().asScala.map(visitColType).toSeq
+  }
+
+  /**
+   * Create a top level [[StructField]] from a column definition.
+   */
+  override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) {
+    import ctx._
+
+    val builder = new MetadataBuilder
+    // Add comment to metadata
+    Option(commentSpec()).map(visitCommentSpec).foreach {
+      builder.putString("comment", _)
+    }
+
+    StructField(
+      name = colName.getText,
+      dataType = typedVisit[DataType](ctx.dataType),
+      nullable = NULL == null,
+      metadata = builder.build())
+  }
+
+  /**
+   * Create a [[StructType]] from a sequence of [[StructField]]s.
+   */
+  protected def createStructType(ctx: ComplexColTypeListContext): StructType = {
+    StructType(Option(ctx).toSeq.flatMap(visitComplexColTypeList))
+  }
+
+  /**
+   * Create a [[StructType]] from a number of column definitions.
+   */
+  override def visitComplexColTypeList(
+                                        ctx: ComplexColTypeListContext): Seq[StructField] = withOrigin(ctx) {
+    ctx.complexColType().asScala.map(visitComplexColType).toSeq
+  }
+
+  /**
+   * Create a [[StructField]] from a column definition.
+   */
+  override def visitComplexColType(ctx: ComplexColTypeContext): StructField = withOrigin(ctx) {
+    import ctx._
+    val structField = StructField(
+      name = identifier.getText,
+      dataType = typedVisit(dataType()),
+      nullable = NULL == null)
+    Option(commentSpec).map(visitCommentSpec).map(structField.withComment).getOrElse(structField)
+  }
+
+  /**
+   * Create a location string.
+   */
+  override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) {
+    string(ctx.STRING)
+  }
+
+  /**
+   * Create an optional location string.
+   */
+  protected def visitLocationSpecList(ctx: java.util.List[LocationSpecContext]): Option[String] = {
+    ctx.asScala.headOption.map(visitLocationSpec)
+  }
+
+  /**
+   * Create a comment string.
+   */
+  override def visitCommentSpec(ctx: CommentSpecContext): String = withOrigin(ctx) {
+    string(ctx.STRING)
+  }
+
+  /**
+   * Create an optional comment string.
+   */
+  protected def visitCommentSpecList(ctx: java.util.List[CommentSpecContext]): Option[String] = {
+    ctx.asScala.headOption.map(visitCommentSpec)
+  }
+
+  /**
+   * Create a [[BucketSpec]].
+   */
+  override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) {
+    BucketSpec(
+      ctx.INTEGER_VALUE.getText.toInt,
+      visitIdentifierList(ctx.identifierList),
+      Option(ctx.orderedIdentifierList)
+        .toSeq
+        .flatMap(_.orderedIdentifier.asScala)
+        .map { orderedIdCtx =>
+          Option(orderedIdCtx.ordering).map(_.getText).foreach { dir =>
+            if (dir.toLowerCase(Locale.ROOT) != "asc") {
+              operationNotAllowed(s"Column ordering must be ASC, was '$dir'", ctx)
+            }
+          }
+
+          orderedIdCtx.ident.getText
+        })
+  }
+
+  /**
+   * Convert a table property list into a key-value map.
+   * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
+   */
+  override def visitTablePropertyList(
+                                       ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
+    val properties = ctx.tableProperty.asScala.map { property =>
+      val key = visitTablePropertyKey(property.key)
+      val value = visitTablePropertyValue(property.value)
+      key -> value
+    }
+    // Check for duplicate property names.
+    checkDuplicateKeys(properties.toSeq, ctx)
+    properties.toMap
+  }
+
+  /**
+   * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
+   */
+  def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
+    val props = visitTablePropertyList(ctx)
+    val badKeys = props.collect { case (key, null) => key }
+    if (badKeys.nonEmpty) {
+      operationNotAllowed(
+        s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx)
+    }
+    props
+  }
+
+  /**
+   * Parse a list of keys from a [[TablePropertyListContext]], assuming no values are specified.
+   */
+  def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = {
+    val props = visitTablePropertyList(ctx)
+    val badKeys = props.filter { case (_, v) => v != null }.keys
+    if (badKeys.nonEmpty) {
+      operationNotAllowed(
+        s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx)
+    }
+    props.keys.toSeq
+  }
+
+  /**
+   * A table property key can either be String or a collection of dot separated elements. This
+   * function extracts the property key based on whether its a string literal or a table property
+   * identifier.
+   */
+  override def visitTablePropertyKey(key: TablePropertyKeyContext): String = {
+    if (key.STRING != null) {
+      string(key.STRING)
+    } else {
+      key.getText
+    }
+  }
+
+  /**
+   * A table property value can be String, Integer, Boolean or Decimal. This function extracts
+   * the property value based on whether its a string, integer, boolean or decimal literal.
+   */
+  override def visitTablePropertyValue(value: TablePropertyValueContext): String = {
+    if (value == null) {
+      null
+    } else if (value.STRING != null) {
+      string(value.STRING)
+    } else if (value.booleanValue != null) {
+      value.getText.toLowerCase(Locale.ROOT)
+    } else {
+      value.getText
+    }
+  }
+
+  /**
+   * Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal).
+   */
+  type TableHeader = (Seq[String], Boolean, Boolean, Boolean)
+
+  /**
+   * Type to keep track of table clauses:
+   * - partition transforms
+   * - partition columns
+   * - bucketSpec
+   * - properties
+   * - options
+   * - location
+   * - comment
+   * - serde
+   *
+   * Note: Partition transforms are based on existing table schema definition. It can be simple
+   * column names, or functions like `year(date_col)`. Partition columns are column names with data
+   * types like `i INT`, which should be appended to the existing table schema.
+   */
+  type TableClauses = (
+    Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
+      Map[String, String], Option[String], Option[String], Option[SerdeInfo])
+
+  /**
+   * Validate a create table statement and return the [[TableIdentifier]].
+   */
+  override def visitCreateTableHeader(
+                                       ctx: CreateTableHeaderContext): TableHeader = withOrigin(ctx) {
+    val temporary = ctx.TEMPORARY != null
+    val ifNotExists = ctx.EXISTS != null
+    if (temporary && ifNotExists) {
+      operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx)
+    }
+    val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText).toSeq
+    (multipartIdentifier, temporary, ifNotExists, ctx.EXTERNAL != null)
+  }
+
+  /**
+   * Validate a replace table statement and return the [[TableIdentifier]].
+   */
+  override def visitReplaceTableHeader(
+                                        ctx: ReplaceTableHeaderContext): TableHeader = withOrigin(ctx) {
+    val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText).toSeq
+    (multipartIdentifier, false, false, false)
+  }
+
+  /**
+   * Parse a qualified name to a multipart name.
+   */
+  override def visitQualifiedName(ctx: QualifiedNameContext): Seq[String] = withOrigin(ctx) {
+    ctx.identifier.asScala.map(_.getText).toSeq
+  }
+
+  /**
+   * Parse a list of transforms or columns.
+   */
+  override def visitPartitionFieldList(
+                                        ctx: PartitionFieldListContext): (Seq[Transform], Seq[StructField]) = withOrigin(ctx) {
+    val (transforms, columns) = ctx.fields.asScala.map {
+      case transform: PartitionTransformContext =>
+        (Some(visitPartitionTransform(transform)), None)
+      case field: PartitionColumnContext =>
+        (None, Some(visitColType(field.colType)))
+    }.unzip
+
+    (transforms.flatten.toSeq, columns.flatten.toSeq)
+  }
+
+  override def visitPartitionTransform(
+                                        ctx: PartitionTransformContext): Transform = withOrigin(ctx) {
+    def getFieldReference(
+                           ctx: ApplyTransformContext,
+                           arg: V2Expression): FieldReference = {
+      lazy val name: String = ctx.identifier.getText
+      arg match {
+        case ref: FieldReference =>
+          ref
+        case nonRef =>
+          throw new ParseException(s"Expected a column reference for transform $name: $nonRef.describe", ctx)
+      }
+    }
+
+    def getSingleFieldReference(
+                                 ctx: ApplyTransformContext,
+                                 arguments: Seq[V2Expression]): FieldReference = {
+      lazy val name: String = ctx.identifier.getText
+      if (arguments.size > 1) {
+        throw new ParseException(s"Too many arguments for transform $name", ctx)
+      } else if (arguments.isEmpty) {
+        throw
+
+          new ParseException(s"Not enough arguments for transform $name", ctx)
+      } else {
+        getFieldReference(ctx, arguments.head)
+      }
+    }
+
+    ctx.transform match {
+      case identityCtx: IdentityTransformContext =>
+        IdentityTransform(FieldReference(typedVisit[Seq[String]](identityCtx.qualifiedName)))
+
+      case applyCtx: ApplyTransformContext =>
+        val arguments = applyCtx.argument.asScala.map(visitTransformArgument).toSeq
+
+        applyCtx.identifier.getText match {
+          case "bucket" =>
+            val numBuckets: Int = arguments.head match {
+              case LiteralValue(shortValue, ShortType) =>
+                shortValue.asInstanceOf[Short].toInt
+              case LiteralValue(intValue, IntegerType) =>
+                intValue.asInstanceOf[Int]
+              case LiteralValue(longValue, LongType) =>
+                longValue.asInstanceOf[Long].toInt
+              case lit =>
+                throw new ParseException(s"Invalid number of buckets: ${lit.describe}", applyCtx)
+            }
+
+            val fields = arguments.tail.map(arg => getFieldReference(applyCtx, arg))
+
+            BucketTransform(LiteralValue(numBuckets, IntegerType), fields)
+
+          case "years" =>
+            YearsTransform(getSingleFieldReference(applyCtx, arguments))
+
+          case "months" =>
+            MonthsTransform(getSingleFieldReference(applyCtx, arguments))
+
+          case "days" =>
+            DaysTransform(getSingleFieldReference(applyCtx, arguments))
+
+          case "hours" =>
+            HoursTransform(getSingleFieldReference(applyCtx, arguments))
+
+          case name =>
+            ApplyTransform(name, arguments)
+        }
+    }
+  }
+
+  /**
+   * Parse an argument to a transform. An argument may be a field reference (qualified name) or
+   * a value literal.
+   */
+  override def visitTransformArgument(ctx: TransformArgumentContext): V2Expression = {
+    withOrigin(ctx) {
+      val reference = Option(ctx.qualifiedName)
+        .map(typedVisit[Seq[String]])
+        .map(FieldReference(_))
+      val literal = Option(ctx.constant)
+        .map(typedVisit[Literal])
+        .map(lit => LiteralValue(lit.value, lit.dataType))
+      reference.orElse(literal)
+        .getOrElse(throw new ParseException("Invalid transform argument", ctx))
+    }
+  }
+
+  def cleanTableProperties(
+                            ctx: ParserRuleContext, properties: Map[String, String]): Map[String, String] = {
+    import TableCatalog._
+    val legacyOn = conf.getConf(SQLConf.LEGACY_PROPERTY_NON_RESERVED)
+    properties.filter {
+      case (PROP_PROVIDER, _) if !legacyOn =>
+        throw new ParseException(s"$PROP_PROVIDER is a reserved table property, please use the USING clause to specify it.", ctx)
+      case (PROP_PROVIDER, _) => false
+      case (PROP_LOCATION, _) if !legacyOn =>
+        throw new ParseException(s"$PROP_LOCATION is a reserved table property, please use the LOCATION clause to specify it.", ctx)
+      case (PROP_LOCATION, _) => false
+      case (PROP_OWNER, _) if !legacyOn =>
+        throw new ParseException(s"$PROP_OWNER is a reserved table property, it will be set to the current user.", ctx)
+      case (PROP_OWNER, _) => false
+      case _ => true
+    }
+  }
+
+  def cleanTableOptions(
+                         ctx: ParserRuleContext,
+                         options: Map[String, String],
+                         location: Option[String]): (Map[String, String], Option[String]) = {
+    var path = location
+    val filtered = cleanTableProperties(ctx, options).filter {
+      case (k, v) if k.equalsIgnoreCase("path") && path.nonEmpty =>
+        throw new ParseException(s"Duplicated table paths found: '${path.get}' and '$v'. LOCATION" +
+          s" and the case insensitive key 'path' in OPTIONS are all used to indicate the custom" +
+          s" table path, you can only specify one of them.", ctx)
+      case (k, v) if k.equalsIgnoreCase("path") =>
+        path = Some(v)
+        false
+      case _ => true
+    }
+    (filtered, path)
+  }
+
+  /**
+   * Create a [[SerdeInfo]] for creating tables.
+   *
+   * Format: STORED AS (name | INPUTFORMAT input_format OUTPUTFORMAT output_format)
+   */
+  override def visitCreateFileFormat(ctx: CreateFileFormatContext): SerdeInfo = withOrigin(ctx) {
+    (ctx.fileFormat, ctx.storageHandler) match {
+      // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
+      case (c: TableFileFormatContext, null) =>
+        SerdeInfo(formatClasses = Some(FormatClasses(string(c.inFmt), string(c.outFmt))))
+      // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO
+      case (c: GenericFileFormatContext, null) =>
+        SerdeInfo(storedAs = Some(c.identifier.getText))
+      case (null, storageHandler) =>
+        operationNotAllowed("STORED BY", ctx)
+      case _ =>
+        throw new ParseException("Expected either STORED AS or STORED BY, not both", ctx)
+    }
+  }
+
+  /**
+   * Create a [[SerdeInfo]] used for creating tables.
+   *
+   * Example format:
+   * {{{
+   *   SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)]
+   * }}}
+   *
+   * OR
+   *
+   * {{{
+   *   DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
+   *   [COLLECTION ITEMS TERMINATED BY char]
+   *   [MAP KEYS TERMINATED BY char]
+   *   [LINES TERMINATED BY char]
+   *   [NULL DEFINED AS char]
+   * }}}
+   */
+  def visitRowFormat(ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) {
+    ctx match {
+      case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
+      case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
+    }
+  }
+
+  /**
+   * Create SERDE row format name and properties pair.
+   */
+  override def visitRowFormatSerde(ctx: RowFormatSerdeContext): SerdeInfo = withOrigin(ctx) {
+    import ctx._
+    SerdeInfo(
+      serde = Some(string(name)),
+      serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
+  }
+
+  /**
+   * Create a delimited row format properties object.
+   */
+  override def visitRowFormatDelimited(
+                                        ctx: RowFormatDelimitedContext): SerdeInfo = withOrigin(ctx) {
+    // Collect the entries if any.
+    def entry(key: String, value: Token): Seq[(String, String)] = {
+      Option(value).toSeq.map(x => key -> string(x))
+    }
+
+    // TODO we need proper support for the NULL format.
+    val entries =
+      entry("field.delim", ctx.fieldsTerminatedBy) ++
+        entry("serialization.format", ctx.fieldsTerminatedBy) ++
+        entry("escape.delim", ctx.escapedBy) ++
+        // The following typo is inherited from Hive...
+        entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++
+        entry("mapkey.delim", ctx.keysTerminatedBy) ++
+        Option(ctx.linesSeparatedBy).toSeq.map { token =>
+          val value = string(token)
+          validate(
+            value == "\n",
+            s"LINES TERMINATED BY only supports newline '\\n' right now: $value",
+            ctx)
+          "line.delim" -> value
+        }
+    SerdeInfo(serdeProperties = entries.toMap)
+  }
+
+  /**
+   * Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT
+   * and STORED AS.
+   *
+   * The following are allowed. Anything else is not:
+   * ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE]
+   * ROW FORMAT DELIMITED ... STORED AS TEXTFILE
+   * ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ...
+   */
+  protected def validateRowFormatFileFormat(
+                                             rowFormatCtx: RowFormatContext,
+                                             createFileFormatCtx: CreateFileFormatContext,
+                                             parentCtx: ParserRuleContext): Unit = {
+    if (!(rowFormatCtx == null || createFileFormatCtx == null)) {
+      (rowFormatCtx, createFileFormatCtx.fileFormat) match {
+        case (_, ffTable: TableFileFormatContext) => // OK
+        case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) =>
+          ffGeneric.identifier.getText.toLowerCase(Locale.ROOT) match {
+            case ("sequencefile" | "textfile" | "rcfile") => // OK
+            case fmt =>
+              operationNotAllowed(
+                s"ROW FORMAT SERDE is incompatible with format '$fmt', which also specifies a serde",
+                parentCtx)
+          }
+        case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) =>
+          ffGeneric.identifier.getText.toLowerCase(Locale.ROOT) match {
+            case "textfile" => // OK
+            case fmt => operationNotAllowed(
+              s"ROW FORMAT DELIMITED is only compatible with 'textfile', not '$fmt'", parentCtx)
+          }
+        case _ =>
+          // should never happen
+          def str(ctx: ParserRuleContext): String = {
+            (0 until ctx.getChildCount).map { i => ctx.getChild(i).getText }.mkString(" ")
+          }
+
+          operationNotAllowed(
+            s"Unexpected combination of ${str(rowFormatCtx)} and ${str(createFileFormatCtx)}",
+            parentCtx)
+      }
+    }
+  }
+
+  protected def validateRowFormatFileFormat(
+                                             rowFormatCtx: Seq[RowFormatContext],
+                                             createFileFormatCtx: Seq[CreateFileFormatContext],
+                                             parentCtx: ParserRuleContext): Unit = {
+    if (rowFormatCtx.size == 1 && createFileFormatCtx.size == 1) {
+      validateRowFormatFileFormat(rowFormatCtx.head, createFileFormatCtx.head, parentCtx)
+    }
+  }
+
+  override def visitCreateTableClauses(ctx: CreateTableClausesContext): TableClauses = {
+    checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
+    checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
+    checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
+    checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
+    checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
+    checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
+    checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
+    checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
+
+    if (ctx.skewSpec.size > 0) {
+      operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
+    }
+
+    val (partTransforms, partCols) =
+      Option(ctx.partitioning).map(visitPartitionFieldList).getOrElse((Nil, Nil))
+    val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
+    val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
+    val cleanedProperties = cleanTableProperties(ctx, properties)
+    val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+    val location = visitLocationSpecList(ctx.locationSpec())
+    val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location)
+    val comment = visitCommentSpecList(ctx.commentSpec())
+    val serdeInfo =
+      getSerdeInfo(ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx)
+    (partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, newLocation, comment,
+      serdeInfo)
+  }
+
+  protected def getSerdeInfo(
+                              rowFormatCtx: Seq[RowFormatContext],
+                              createFileFormatCtx: Seq[CreateFileFormatContext],
+                              ctx: ParserRuleContext): Option[SerdeInfo] = {
+    validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx)
+    val rowFormatSerdeInfo = rowFormatCtx.map(visitRowFormat)
+    val fileFormatSerdeInfo = createFileFormatCtx.map(visitCreateFileFormat)
+    (fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((l, r) => l.merge(r))
+  }
+
+  private def partitionExpressions(
+                                    partTransforms: Seq[Transform],
+                                    partCols: Seq[StructField],
+                                    ctx: ParserRuleContext): Seq[Transform] = {
+    if (partTransforms.nonEmpty) {
+      if (partCols.nonEmpty) {
+        val references = partTransforms.map(_.describe()).mkString(", ")
+        val columns = partCols
+          .map(field => s"${field.name} ${field.dataType.simpleString}")
+          .mkString(", ")
+        operationNotAllowed(
+          s"""PARTITION BY: Cannot mix partition expressions and partition columns:
+             |Expressions: $references
+             |Columns: $columns""".stripMargin, ctx)
+
+      }
+      partTransforms
+    } else {
+      // columns were added to create the schema. convert to column references
+      partCols.map { column =>
+        IdentityTransform(FieldReference(Seq(column.name)))
+      }
+    }
+  }
+
+  /**
+   * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelect]] logical plan.
+   *
+   * Expected format:
+   * {{{
+   *   CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
+   *   [USING table_provider]
+   *   create_table_clauses
+   *   [[AS] select_statement];
+   *
+   *   create_table_clauses (order insensitive):
+   *     [PARTITIONED BY (partition_fields)]
+   *     [OPTIONS table_property_list]
+   *     [ROW FORMAT row_format]
+   *     [STORED AS file_format]
+   *     [CLUSTERED BY (col_name, col_name, ...)
+   *       [SORTED BY (col_name [ASC|DESC], ...)]
+   *       INTO num_buckets BUCKETS
+   *     ]
+   *     [LOCATION path]
+   *     [COMMENT table_comment]
+   *     [TBLPROPERTIES (property_name=property_value, ...)]
+   *
+   *   partition_fields:
+   *     col_name, transform(col_name), transform(constant, col_name), ... |
+   *     col_name data_type [NOT NULL] [COMMENT col_comment], ...
+   * }}}
+   */
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
+    val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
+
+    val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
+    val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
+    val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
+      visitCreateTableClauses(ctx.createTableClauses())
+
+    if (provider.isDefined && serdeInfo.isDefined) {
+      operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
+    }
+
+    if (temp) {
+      val asSelect = if (ctx.query == null) "" else " AS ..."
+      operationNotAllowed(
+        s"CREATE TEMPORARY TABLE ...$asSelect, use CREATE TEMPORARY VIEW instead", ctx)
+    }
+
+    // partition transforms for BucketSpec was moved inside parser
+    // https://issues.apache.org/jira/browse/SPARK-37923
+    val partitioning =
+      partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
+    val tableSpec = TableSpec(properties, provider, options, location, comment,
+      serdeInfo, external)
+
+    Option(ctx.query).map(plan) match {
+      case Some(_) if columns.nonEmpty =>
+        operationNotAllowed(
+          "Schema may not be specified in a Create Table As Select (CTAS) statement",
+          ctx)
+
+      case Some(_) if partCols.nonEmpty =>
+        // non-reference partition columns are not allowed because schema can't be specified
+        operationNotAllowed(
+          "Partition column types may not be specified in Create Table As Select (CTAS)",
+          ctx)
+
+        // CreateTable / CreateTableAsSelect was migrated to v2 in Spark 3.3.0

Review Comment:
   Change made according to https://issues.apache.org/jira/browse/SPARK-36850



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org