You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/24 01:55:05 UTC

[GitHub] [hudi] xushiyan commented on a change in pull request #4611: [HUDI-3254] Introduce HoodieCatalog to manage tables for Spark Datasource V2

xushiyan commented on a change in pull request #4611:
URL: https://github.com/apache/hudi/pull/4611#discussion_r790364397



##########
File path: hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieConfigHelper.scala
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait HoodieConfigHelper extends Logging {
+
+  def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
+    val sparkSession: SparkSession = hoodieCatalogTable.spark
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableId = hoodieCatalogTable.table.identifier
+
+    val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")

Review comment:
       try to keep name aligned for clarity; applies to the rest of the file too
   
   ```suggestion
       val preCombineField = Option(tableConfig.getPreCombineField).getOrElse("")
   ```

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieConfigHelper.scala
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait HoodieConfigHelper extends Logging {
+
+  def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
+    val sparkSession: SparkSession = hoodieCatalogTable.spark
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableId = hoodieCatalogTable.table.identifier
+
+    val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
+    assert(hoodieCatalogTable.primaryKeys.nonEmpty,
+      s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator")
+    val enableHive = isEnableHive(sparkSession)
+
+    withSparkConf(sparkSession, catalogProperties) {
+      Map(
+        "path" -> hoodieCatalogTable.tableLocation,
+        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+        PRECOMBINE_FIELD.key -> preCombineColumn,
+        TBL_NAME.key -> hoodieCatalogTable.tableName,
+        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+        SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+        OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
+        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+        META_SYNC_ENABLED.key -> enableHive.toString,
+        HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+        HIVE_USE_JDBC.key -> "false",
+        HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
+        HIVE_TABLE.key -> tableId.table,
+        HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+        HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
+        HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+        HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
+        SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
+      )
+    }
+  }
+
+  def cast(exp: Expression, field: StructField, sqlConf: SQLConf): Expression = {
+    castIfNeeded(exp, field.dataType, sqlConf)
+  }
+
+  /**
+   * Build the default config for insert.
+   * @return
+   */
+    def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
+                                sparkSession: SparkSession,
+                                isOverwrite: Boolean,
+                                insertPartitions: Map[String, Option[String]] = Map.empty,
+                                extraOptions: Map[String, String]): Map[String, String] = {
+
+    if (insertPartitions.nonEmpty &&
+      (insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) {
+      throw new IllegalArgumentException(s"Insert partition fields" +
+        s"[${insertPartitions.keys.mkString(" " )}]" +
+        s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]")
+    }

Review comment:
       can be replaced with `require`

##########
File path: hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HudiSpark3SqlUtils.scala
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform}
+
+import scala.collection.mutable
+
+object HudiSpark3SqlUtils {

Review comment:
       ```suggestion
   object HoodieSpark3SqlUtils {
   ```

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
##########
@@ -54,6 +54,10 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
     override def get() = new SimpleDateFormat("yyyy-MM-dd")
   })
 
+  def isHoodieTable(properties: Map[String, String]): Boolean = {
+    properties.getOrElse("provider", "").toLowerCase(Locale.ROOT) == "hudi"
+  }
+

Review comment:
       this and other overloaded `isHoodieTable()` duplicate the ones in spark adapter? 

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieConfigHelper.scala
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait HoodieConfigHelper extends Logging {
+
+  def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
+    val sparkSession: SparkSession = hoodieCatalogTable.spark
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableId = hoodieCatalogTable.table.identifier
+
+    val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
+    assert(hoodieCatalogTable.primaryKeys.nonEmpty,
+      s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator")
+    val enableHive = isEnableHive(sparkSession)
+
+    withSparkConf(sparkSession, catalogProperties) {
+      Map(
+        "path" -> hoodieCatalogTable.tableLocation,
+        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+        PRECOMBINE_FIELD.key -> preCombineColumn,
+        TBL_NAME.key -> hoodieCatalogTable.tableName,
+        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+        SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+        OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
+        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+        META_SYNC_ENABLED.key -> enableHive.toString,
+        HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+        HIVE_USE_JDBC.key -> "false",
+        HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
+        HIVE_TABLE.key -> tableId.table,
+        HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+        HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
+        HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+        HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
+        SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
+      )
+    }
+  }
+
+  def cast(exp: Expression, field: StructField, sqlConf: SQLConf): Expression = {
+    castIfNeeded(exp, field.dataType, sqlConf)
+  }

Review comment:
       is this needed? just wrapping another method

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieConfigHelper.scala
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait HoodieConfigHelper extends Logging {

Review comment:
       ~better use a singleton object instead of a trait ?~
   
   Saw that HoodieCatalog implementing this, so it reads better with `ProvidesHoodieConfig` as a trait ?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
##########
@@ -57,7 +57,8 @@ case class AlterHoodieTableAddColumnsCommand(
           s" table columns is: [${hoodieCatalogTable.tableSchemaWithoutMetaFields.fieldNames.mkString(",")}]")
       }
       // Get the new schema
-      val newSqlSchema = StructType(tableSchema.fields ++ colsToAdd)
+      val rearrangeSchema = hoodieCatalogTable.dataSchema ++ colsToAdd ++ hoodieCatalogTable.partitionSchema

Review comment:
       ```suggestion
         val rearrangedSchema = hoodieCatalogTable.dataSchema ++ colsToAdd ++ hoodieCatalogTable.partitionSchema
   ```

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/BaseStagedTable.scala
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.connector.catalog._
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+
+case class BaseStagedTable(ident: Identifier,

Review comment:
       some docs for this class ?

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
+import org.apache.spark.sql.connector.catalog.TableCapability._
+import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, V2TableWithV1Fallback}
+import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.sources.{Filter, InsertableRelation}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+import java.util
+import scala.collection.JavaConverters.{mapAsJavaMapConverter, setAsJavaSetConverter}
+
+case class HoodieInternalV2Table(spark: SparkSession,
+                                 path: String,
+                                 catalogTable: Option[CatalogTable] = None,
+                                 tableIdentifier: Option[String] = None,
+                                 options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty())
+  extends Table with SupportsWrite with V2TableWithV1Fallback {
+
+  lazy val hoodieCatalogTable: HoodieCatalogTable = if (catalogTable.isDefined) {
+    HoodieCatalogTable(spark, catalogTable.get)
+  } else {
+    val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder()
+      .setBasePath(path)
+      .setConf(SparkSession.active.sessionState.newHadoopConf)
+      .build()
+
+    val tableConfig: HoodieTableConfig = metaClient.getTableConfig
+    val tableName: String = tableConfig.getTableName
+
+    HoodieCatalogTable(spark, TableIdentifier(tableName))
+  }
+
+  private lazy val tableSchema: StructType = hoodieCatalogTable.tableSchema
+
+  override def name(): String = hoodieCatalogTable.table.identifier.unquotedString
+
+  override def schema(): StructType = tableSchema
+
+  override def capabilities(): util.Set[TableCapability] = Set(
+    BATCH_READ, V1_BATCH_WRITE, OVERWRITE_BY_FILTER, TRUNCATE, ACCEPT_ANY_SCHEMA
+  ).asJava
+
+  override def properties(): util.Map[String, String] = {
+    val map = new util.HashMap[String, String]()
+    map.put("provider", "hudi")

Review comment:
       why not make this part of catalog properties ?

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
+import org.apache.spark.sql.connector.catalog.TableCapability._
+import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, V2TableWithV1Fallback}
+import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.sources.{Filter, InsertableRelation}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+import java.util
+import scala.collection.JavaConverters.{mapAsJavaMapConverter, setAsJavaSetConverter}
+
+case class HoodieInternalV2Table(spark: SparkSession,
+                                 path: String,
+                                 catalogTable: Option[CatalogTable] = None,
+                                 tableIdentifier: Option[String] = None,
+                                 options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty())
+  extends Table with SupportsWrite with V2TableWithV1Fallback {
+
+  lazy val hoodieCatalogTable: HoodieCatalogTable = if (catalogTable.isDefined) {
+    HoodieCatalogTable(spark, catalogTable.get)
+  } else {
+    val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder()
+      .setBasePath(path)
+      .setConf(SparkSession.active.sessionState.newHadoopConf)
+      .build()
+
+    val tableConfig: HoodieTableConfig = metaClient.getTableConfig
+    val tableName: String = tableConfig.getTableName
+
+    HoodieCatalogTable(spark, TableIdentifier(tableName))
+  }
+
+  private lazy val tableSchema: StructType = hoodieCatalogTable.tableSchema
+
+  override def name(): String = hoodieCatalogTable.table.identifier.unquotedString
+
+  override def schema(): StructType = tableSchema
+
+  override def capabilities(): util.Set[TableCapability] = Set(
+    BATCH_READ, V1_BATCH_WRITE, OVERWRITE_BY_FILTER, TRUNCATE, ACCEPT_ANY_SCHEMA
+  ).asJava
+
+  override def properties(): util.Map[String, String] = {
+    val map = new util.HashMap[String, String]()
+    map.put("provider", "hudi")
+    map.putAll(hoodieCatalogTable.catalogProperties.asJava)
+    map
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    new WriteIntoHoodieBuilder(info.options, hoodieCatalogTable, spark)
+  }
+
+  override def v1Table: CatalogTable = hoodieCatalogTable.table
+
+  override def partitioning(): Array[Transform] = {
+    hoodieCatalogTable.partitionFields.map { col =>
+      new IdentityTransform(new FieldReference(Seq(col)))
+    }.toArray
+  }
+
+}
+
+private class WriteIntoHoodieBuilder(writeOptions: CaseInsensitiveStringMap,

Review comment:
       to be explicit, better call `HoodieV1WriteBuilder` ?

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieConfigHelper.scala
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait HoodieConfigHelper extends Logging {
+
+  def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
+    val sparkSession: SparkSession = hoodieCatalogTable.spark
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableId = hoodieCatalogTable.table.identifier
+
+    val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
+    assert(hoodieCatalogTable.primaryKeys.nonEmpty,

Review comment:
       better use `require` instead of `assert`

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -42,12 +43,39 @@ object HoodieAnalysis {
     Seq(
       session => HoodieResolveReferences(session),
       session => HoodieAnalysis(session)
-    )
+    ) ++ extraResolutionRules()
 
   def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
     Seq(
       session => HoodiePostAnalysisRule(session)
-    )
+    ) ++ extraPostHocResolutionRules()
+
+  def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = {
+    if (HoodieSparkUtils.isSpark3_2) {
+      val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis"
+      val spark3Analysis: SparkSession => Rule[LogicalPlan] =
+        session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]]
+
+      val spark3ResolveReferences = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
+      val spark3References: SparkSession => Rule[LogicalPlan] =
+        session => ReflectionUtils.loadClass(spark3ResolveReferences, session).asInstanceOf[Rule[LogicalPlan]]
+
+      Seq(spark3Analysis, spark3References)
+    } else {
+      Seq.empty
+    }
+  }
+
+  def extraPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
+    if (HoodieSparkUtils.isSpark3_2) {

Review comment:
       ditto

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -42,12 +43,39 @@ object HoodieAnalysis {
     Seq(
       session => HoodieResolveReferences(session),
       session => HoodieAnalysis(session)
-    )
+    ) ++ extraResolutionRules()
 
   def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
     Seq(
       session => HoodiePostAnalysisRule(session)
-    )
+    ) ++ extraPostHocResolutionRules()
+
+  def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = {
+    if (HoodieSparkUtils.isSpark3_2) {

Review comment:
       assuming in future spark 3 versions like 3.3 this case also applies, then we should have something like `HoodieSparkutils.beforeSpark3_2` and negate the condition

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
##########
@@ -94,11 +94,17 @@ class TestHoodieSparkSqlWriter {
    * Utility method for initializing the spark context.
    */
   def initSparkContext(): Unit = {
+    val sparkConf = new SparkConf()
+    if (HoodieSparkUtils.isSpark3_2) {
+      sparkConf.set("spark.sql.catalog.spark_catalog",
+        "org.apache.spark.sql.hudi.catalog.HoodieCatalog")

Review comment:
       can you help me understand why only set this for spark 3.2 pls?

##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
##########
@@ -92,4 +95,31 @@ trait SparkAdapter extends Serializable {
    * ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.
    */
   def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String]
+
+  def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
+    tripAlias(table) match {
+      case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl)
+      case relation: UnresolvedRelation =>
+        isHoodieTable(toTableIdentifier(relation), spark)
+      case _=> false
+    }
+  }
+
+  def tripAlias(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case SubqueryAlias(_, relation: LogicalPlan) =>
+        tripAlias(relation)
+      case other =>
+        other
+    }
+  }
+
+  def isHoodieTable(table: CatalogTable): Boolean = {
+    table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
+  }
+
+  def isHoodieTable(tableId: TableIdentifier, spark: SparkSession): Boolean = {
+    val table = spark.sessionState.catalog.getTableMetadata(tableId)
+    isHoodieTable(table)
+  }

Review comment:
       it'll be nice to move these 2 up closer to their overloading method above `tripAlias()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org