You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2022/10/26 05:11:20 UTC
[hudi] branch master updated: temp_view_support (#6990)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e13b2129dc temp_view_support (#6990)
e13b2129dc is described below
commit e13b2129dc144ca505e39c0d7fa479c47362bb56
Author: 苏承祥 <sc...@aliyun.com>
AuthorDate: Wed Oct 26 13:11:15 2022 +0800
temp_view_support (#6990)
Co-authored-by: 苏承祥 <su...@tuya.com>
---
.../hudi/command/procedures/CopyToTempView.scala | 114 ++++++++++++++
.../hudi/command/procedures/HoodieProcedures.scala | 1 +
.../procedure/TestCopyToTempViewProcedure.scala | 168 +++++++++++++++++++++
3 files changed, 283 insertions(+)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala
new file mode 100644
index 0000000000..13259c4964
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.DataSourceReadOptions
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import java.util.function.Supplier
+
+class CopyToTempView extends BaseProcedure with ProcedureBuilder with Logging {
+
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "query_type", DataTypes.StringType, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL),
+ ProcedureParameter.required(2, "view_name", DataTypes.StringType, None),
+ ProcedureParameter.optional(3, "begin_instance_time", DataTypes.StringType, ""),
+ ProcedureParameter.optional(4, "end_instance_time", DataTypes.StringType, ""),
+ ProcedureParameter.optional(5, "as_of_instant", DataTypes.StringType, ""),
+ ProcedureParameter.optional(6, "replace", DataTypes.BooleanType, false),
+ ProcedureParameter.optional(7, "global", DataTypes.BooleanType, false)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("status", DataTypes.IntegerType, nullable = true, Metadata.empty))
+ )
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val queryType = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
+ val viewName = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
+ val beginInstance = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
+ val endInstance = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String]
+ val asOfInstant = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[String]
+ val replace = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[Boolean]
+ val global = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[Boolean]
+
+ val tablePath = getBasePath(tableName)
+
+ val sourceDataFrame = queryType match {
+ case DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL => if (asOfInstant.nonEmpty) {
+ sparkSession.read
+ .format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, asOfInstant)
+ .load(tablePath)
+ } else {
+ sparkSession.read
+ .format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(tablePath)
+ }
+ case DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL =>
+ assert(beginInstance.nonEmpty && endInstance.nonEmpty, "when the query_type is incremental, begin_instance_time and end_instance_time can not be null.")
+ sparkSession.read
+ .format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, beginInstance)
+ .option(DataSourceReadOptions.END_INSTANTTIME.key, endInstance)
+ .load(tablePath)
+ case DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL =>
+ sparkSession.read
+ .format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(tablePath)
+ }
+ if (global) {
+ if (replace) {
+ sourceDataFrame.createOrReplaceGlobalTempView(viewName)
+ } else {
+ sourceDataFrame.createGlobalTempView(viewName)
+ }
+ } else {
+ if (replace) {
+ sourceDataFrame.createOrReplaceTempView(viewName)
+ } else {
+ sourceDataFrame.createTempView(viewName)
+ }
+ }
+ Seq(Row(0))
+ }
+
+ override def build = new CopyToTempView()
+}
+
+object CopyToTempView {
+ val NAME = "copy_to_temp_view"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new CopyToTempView()
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index b2bbec8489..713cd5d7da 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -80,6 +80,7 @@ object HoodieProcedures {
,(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder)
,(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder)
,(HiveSyncProcedure.NAME, HiveSyncProcedure.builder)
+ ,(CopyToTempView.NAME, CopyToTempView.builder)
)
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTempViewProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTempViewProcedure.scala
new file mode 100644
index 0000000000..13da259df1
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTempViewProcedure.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestCopyToTempViewProcedure extends HoodieSparkSqlTestBase {
+
+
+ test("Test Call copy_to_temp_view Procedure with default params") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+ spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
+ spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
+
+ // Check required fields
+ checkExceptionContain(s"call copy_to_temp_view(table=>'$tableName')")(s"Argument: view_name is required")
+
+ val viewName = generateTableName
+
+ val row = spark.sql(s"""call copy_to_temp_view(table=>'$tableName',view_name=>'$viewName')""").collectAsList()
+ assert(row.size() == 1 && row.get(0).get(0) == 0)
+ val copyTableCount = spark.sql(s"""select count(1) from $viewName""").collectAsList()
+ assert(copyTableCount.size() == 1 && copyTableCount.get(0).get(0) == 4)
+ }
+ }
+
+ test("Test Call copy_to_temp_view Procedure with replace params") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+ spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
+ spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
+
+ // Check required fields
+ checkExceptionContain(s"call copy_to_temp_view(table=>'$tableName')")(s"Argument: view_name is required")
+
+ // 1: copyToTempView
+ val viewName = generateTableName
+ val row = spark.sql(s"""call copy_to_temp_view(table=>'$tableName',view_name=>'$viewName')""").collectAsList()
+ assert(row.size() == 1 && row.get(0).get(0) == 0)
+ val copyTableCount = spark.sql(s"""select count(1) from $viewName""").collectAsList()
+ assert(copyTableCount.size() == 1 && copyTableCount.get(0).get(0) == 4)
+
+ // 2: add new record to hudi table
+ spark.sql(s"insert into $tableName select 5, 'a5', 40, 2500")
+
+ // 3: copyToTempView with replace=false
+ checkExceptionContain(s"""call copy_to_temp_view(table=>'$tableName',view_name=>'$viewName',replace=>false)""")(s"Temporary view '$viewName' already exists")
+ // 4: copyToTempView with replace=true
+ val row2 = spark.sql(s"""call copy_to_temp_view(table=>'$tableName',view_name=>'$viewName',replace=>true)""").collectAsList()
+ assert(row2.size() == 1 && row2.get(0).get(0) == 0)
+ // 5: query new replace view ,count=5
+ val newViewCount = spark.sql(s"""select count(1) from $viewName""").collectAsList()
+ assert(newViewCount.size() == 1 && newViewCount.get(0).get(0) == 5)
+ }
+ }
+
+ test("Test Call copy_to_temp_view Procedure with global params") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+ spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
+ spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
+
+ // Check required fields
+ checkExceptionContain(s"call copy_to_temp_view(table=>'$tableName')")(s"Argument: view_name is required")
+
+ // 1: copyToTempView with global=false
+ val viewName = generateTableName
+ val row = spark.sql(s"""call copy_to_temp_view(table=>'$tableName',view_name=>'$viewName',global=>false)""").collectAsList()
+ assert(row.size() == 1 && row.get(0).get(0) == 0)
+ val copyTableCount = spark.sql(s"""select count(1) from $viewName""").collectAsList()
+ assert(copyTableCount.size() == 1 && copyTableCount.get(0).get(0) == 4)
+
+ // 2: query view in other session
+ var newSession = spark.newSession()
+ var hasException = false
+ val errorMsg = s"Table or view not found: $viewName"
+ try {
+ newSession.sql(s"""select count(1) from $viewName""")
+ } catch {
+ case e: Throwable if e.getMessage.contains(errorMsg) => hasException = true
+ case f: Throwable => fail("Exception should contain: " + errorMsg + ", error message: " + f.getMessage, f)
+ }
+ assertResult(true)(hasException)
+ // 3: copyToTempView with global=true,
+ val row2 = spark.sql(s"""call copy_to_temp_view(table=>'$tableName',view_name=>'$viewName',global=>true,replace=>true)""").collectAsList()
+ assert(row2.size() == 1 && row2.get(0).get(0) == 0)
+
+ newSession = spark.newSession()
+ // 4: query view in other session
+ val newViewCount = spark.sql(s"""select count(1) from $viewName""").collectAsList()
+ assert(newViewCount.size() == 1 && newViewCount.get(0).get(0) == 4)
+
+ }
+ }
+}