You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2022/06/27 04:51:05 UTC

[hudi] branch master updated: [HUDI-4315] Do not throw exception in BaseSpark3Adapter#toTableIdentifier (#5957)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f4e2a189e [HUDI-4315] Do not throw exception in BaseSpark3Adapter#toTableIdentifier (#5957)
8f4e2a189e is described below

commit 8f4e2a189e0d89665aaa2a2487c13971ab87b1ef
Author: leesf <49...@qq.com>
AuthorDate: Mon Jun 27 12:50:58 2022 +0800

    [HUDI-4315] Do not throw exception in BaseSpark3Adapter#toTableIdentifier (#5957)
---
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 44 ++++++++++++++++++++++
 .../spark/sql/adapter/BaseSpark3Adapter.scala      | 17 ++++++---
 2 files changed, 56 insertions(+), 5 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index fc9de60c67..3f9066d084 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieDuplicateKeyException
@@ -696,4 +697,47 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test Insert Into With Catalog Identifier for spark >= 3.2.0") {
+    Seq("hudi", "parquet").foreach { format =>
+      withTempDir { tmp =>
+        val tableName = s"spark_catalog.default.$generateTableName"
+        // Create a partitioned table
+        if (HoodieSparkUtils.gteqSpark3_2) {
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long,
+               |  dt string
+               |) using $format
+               | tblproperties (primaryKey = 'id')
+               | partitioned by (dt)
+               | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+          // Insert into dynamic partition
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-01-05' as dt
+        """.stripMargin)
+          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+            Seq(1, "a1", 10.0, 1000, "2021-01-05")
+          )
+          // Insert into static partition
+          spark.sql(
+            s"""
+               | insert into $tableName partition(dt = '2021-01-05')
+               | select 2 as id, 'a2' as name, 10 as price, 1000 as ts
+        """.stripMargin)
+          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+            Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+            Seq(2, "a2", 10.0, 1000, "2021-01-05")
+          )
+        }
+      }
+    }
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 48d323c8f2..115913c230 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.adapter
 
 import org.apache.hudi.Spark3RowSerDe
 import org.apache.hudi.client.utils.SparkRowSerDe
-import org.apache.spark.SPARK_VERSION
 import org.apache.hudi.spark3.internal.ReflectUtil
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -28,21 +29,21 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredica
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
-import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.{Row, SparkSession}
 
+import scala.util.control.NonFatal
+
 /**
  * Base implementation of [[SparkAdapter]] for Spark 3.x branch
  */
-abstract class BaseSpark3Adapter extends SparkAdapter {
+abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
 
   override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = {
     new Spark3RowSerDe(encoder)
@@ -115,7 +116,13 @@ abstract class BaseSpark3Adapter extends SparkAdapter {
     unfoldSubqueryAliases(table) match {
       case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
       case relation: UnresolvedRelation =>
-        isHoodieTable(toTableIdentifier(relation), spark)
+        try {
+          isHoodieTable(toTableIdentifier(relation), spark)
+        } catch {
+          case NonFatal(e) =>
+            logWarning("Failed to determine whether the table is a hoodie table", e)
+            false
+        }
       case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())
       case _=> false
     }