You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/08/12 12:00:25 UTC

[hudi] branch asf-site updated: [HUDI-2063] Add Doc For Spark Sql Integrates With Hudi (#3140)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 252e906  [HUDI-2063] Add Doc For Spark Sql Integrates With Hudi (#3140)
252e906 is described below

commit 252e906933518383a212620985fe6824d956f2d2
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Thu Aug 12 20:00:11 2021 +0800

    [HUDI-2063] Add Doc For Spark Sql Integrates With Hudi (#3140)
---
 website/docs/querying_data.md     |  10 +-
 website/docs/quick-start-guide.md | 444 ++++++++++++++++++++++++++++++++++++--
 2 files changed, 439 insertions(+), 15 deletions(-)

diff --git a/website/docs/querying_data.md b/website/docs/querying_data.md
index a07d4a6..8a4399a 100644
--- a/website/docs/querying_data.md
+++ b/website/docs/querying_data.md
@@ -119,6 +119,11 @@ By default, Spark SQL will try to use its own parquet reader instead of Hive Ser
 both parquet and avro data, this default setting needs to be turned off using set `spark.sql.hive.convertMetastoreParquet=false`. 
 This will force Spark to fallback to using the Hive Serde to read the data (planning/executions is still Spark). 
 
+**NOTICE**
+
+Since 0.9.0 hudi will sync the table to hive as a spark datasource table. So we do not need the `spark.sql.hive.convertMetastoreParquet=false`
+config anymore for the table synced by 0.9.0+ version.
+
 ```java
 $ spark-shell --driver-class-path /etc/hive/conf  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.3,org.apache.spark:spark-avro_2.11:2.4.4 --conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 7g --executor-memory 2g  --master yarn-client
 
@@ -141,14 +146,13 @@ and executors. Alternatively, hudi-spark-bundle can also fetched via the `--pack
 
 ### Snapshot query {#spark-snap-query}
 This method can be used to retrieve the data table at the present point in time.
-Note: The file path must be suffixed with a number of wildcard asterisk (`/*`) one greater than the number of partition levels. Eg: with table file path "tablePath" partitioned by columns "a", "b", and "c", the load path must be `tablePath + "/*/*/*/*"`
 
 ```scala
 val hudiIncQueryDF = spark
      .read()
-     .format("org.apache.hudi")
+     .format("hudi")
      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
-     .load(tablePath + "/*") //The number of wildcard asterisks here must be one greater than the number of partition
+     .load(tablePath) 
 ```
 
 ### Incremental query {#spark-incr-query}
diff --git a/website/docs/quick-start-guide.md b/website/docs/quick-start-guide.md
index 3deb102..a6ed3ba 100644
--- a/website/docs/quick-start-guide.md
+++ b/website/docs/quick-start-guide.md
@@ -22,6 +22,7 @@ defaultValue="scala"
 values={[
 { label: 'Scala', value: 'scala', },
 { label: 'Python', value: 'python', },
+{ label: 'SparkSQL', value: 'sparksql', },
 ]}>
 <TabItem value="scala">
 
@@ -43,6 +44,28 @@ spark-shell \
 ```
 
 </TabItem>
+<TabItem value="sparksql">
+
+Hudi support using spark sql to write and read data with the **HoodieSparkSessionExtension** sql extension.
+```shell
+# spark sql for spark 3
+spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1 \
+--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
+--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
+
+# spark-sql for spark 2 with scala 2.11
+spark-sql --packages org.apache.hudi:hudi-spark-bundle_2.11:0.9.0,org.apache.spark:spark-avro_2.11:2.4.4 \
+--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
+--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
+
+# spark-sql for spark 2 with scala 2.12
+spark-sql \
+  --packages org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
+  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
+```
+
+</TabItem>
 <TabItem value="python">
 
 ```python
@@ -119,6 +142,137 @@ The [DataGenerator](https://github.com/apache/hudi/blob/master/hudi-spark/src/ma
 can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
 :::
 
+## Create Table
+
+Hudi support create table using spark-sql.
+
+- COW & MOR table
+  
+  We can create a COW table or MOR table by specified the **type** option. **type = 'cow'** represents COW table, while **type = 'mor'** represents MOR table.
+
+
+- Partitioned & Non-Partitioned table
+
+ We can use the **partitioned by** statement to specified the partition columns to create a partitioned table.
+  
+- Managed & External table
+
+ If the table has not specified the **location** statement, it is a managed table. Or else it is an exteranl table.
+  
+- Table with primary key
+
+ If the table has specified the **primaryKey** column in options, it is a table with primary key.
+  
+
+**Create Non-Partitioned Table**
+
+Here is an example of creating a non-partitioned COW managed table with a primary key 'id'.
+
+```sql
+-- create a managed cow table
+create table if not exists h0(
+  id int, 
+  name string, 
+  price double
+) using hudi
+options (
+  type = 'cow',
+  primaryKey = 'id'
+);
+```
+
+This is an example of creating an MOR external table which specified the table's location. The **preCombineField** option
+is used to specify the preCombine field for merge.
+
+```sql
+-- creae an external mor table
+create table if not exists h1(
+  id int, 
+  name string, 
+  price double,
+  ts bigint
+) using hudi
+location '/tmp/hudi/h0'  
+options (
+  type = 'mor',
+  primaryKey = 'id,name',
+  preCombineField = 'ts' 
+)
+;
+```
+
+Here is the example of creating a COW table without primary key.
+
+```sql
+-- create a non-primary key table
+create table if not exists h2(
+  id int, 
+  name string, 
+  price double
+) using hudi
+options (
+  type = 'cow'
+);
+```
+**Create Partitioned Table**
+```sql
+create table if not exists h_p0 (
+id bigint,
+name string,
+dt string,
+hh string  
+) using hudi
+location '/tmp/hudi/h_p0'
+options (
+  type = 'cow',
+  primaryKey = 'id',
+  preCombineField = 'ts'
+ ) 
+partitioned by (dt, hh)
+;
+```
+**Create Table On The Exists Table Path**
+
+We can create a table on an exists hudi table path. This is useful to read/write from a pre-exists hudi table by spark-sql.
+```sql
+ create table h_p1 using hudi 
+ options (
+    primaryKey = 'id',
+    preCombineField = 'ts'
+ )
+ partitioned by (dt)
+ location '/path/to/hudi'
+```
+
+**CTAS**
+
+Hudi support CTAS on spark sql. For better performance to loading data to hudi table, CTAS use the **bulk insert** 
+write operation.
+```sql
+-- create a partitioned COW table
+create table h2 using hudi
+options (type = 'cow', primaryKey = 'id')
+partitioned by (dt)
+as
+select 1 as id, 'a1' as name, 10 as price, 1000 as dt
+;
+
+-- create a non-partitioned COW table.
+create table h3 using hudi
+as
+select 1 as id, 'a1' as name, 10 as price
+;
+```
+
+**Create Table Options**
+
+| Parameter Name | Introduction |
+|------------|--------|
+| primaryKey | The primary key names of the table, multiple fields separated by commas. |
+| type       | The table type to create. type = 'cow' means a COPY-ON-WRITE table,while type = 'mor' means a MERGE-ON-READ table. Default value is 'cow' without specified this option.|
+| preCombineField | The Pre-Combine field of the table. |
+
+To set custom hudi config, see the  "Set hudi config section" would help.
 
 ## Insert data
 
@@ -129,6 +283,7 @@ defaultValue="scala"
 values={[
 { label: 'Scala', value: 'scala', },
 { label: 'Python', value: 'python', },
+{ label: 'SparkSQL', value: 'sparksql', },
 ]}>
 <TabItem value="scala">
 
@@ -147,6 +302,55 @@ df.write.format("hudi").
 ``` 
 
 </TabItem>
+<TabItem value="sparksql">
+    
+```sql
+insert into h0 select 1, 'a1', 20;
+
+-- insert static partition
+insert into h_p0 partition(dt = '2021-01-02') select 1, 'a1';
+
+-- insert dynamic partition
+insert into h_p0 select 1, 'a1', dt;
+
+-- insert dynamic partition
+insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh;
+
+-- insert overwrite table
+insert overwrite table h0 select 1, 'a1', 20;
+
+-- insert overwrite table with static partition
+insert overwrite h_p0 partition(dt = '2021-01-02') select 1, 'a1';
+
+- insert overwrite table with dynamic partition
+  insert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh;
+```
+
+**NOTICE**
+
+1. Insert mode
+
+Hudi support three insert modes when inserting data to a table with primary key(we call it pk-table as followed):
+- upsert
+  
+  This it the default insert mode. For upsert mode, insert statement do the upsert operation for the pk-table which will update the duplicate record
+- strict
+
+For strict mode, insert statement will keep the primary key uniqueness constraint for COW table which do not allow duplicate record.
+If inserting a record which the primary key is already exists to the table, a HoodieDuplicateKeyException will throw out
+for COW table. For MOR table, it has the same behavior with "upsert" mode.
+
+- non-strict
+
+For non-strict mode, hudi just do the insert operation for the pk-table.
+
+We can set the insert mode by the config: **hoodie.sql.insert.mode**
+
+2. Bulk Insert
+By default, hudi use the normal insert operation for insert statement. We can set **hoodie.sql.bulk.insert.enable** to true to enable 
+the bulk insert for insert statement.
+   
+</TabItem>
 <TabItem value="python">
 
 ```python
@@ -200,6 +404,7 @@ defaultValue="scala"
 values={[
 { label: 'Scala', value: 'scala', },
 { label: 'Python', value: 'python', },
+{ label: 'SparkSQL', value: 'sparksql', },
 ]}>
 <TabItem value="scala">
 
@@ -208,7 +413,7 @@ values={[
 val tripsSnapshotDF = spark.
   read.
   format("hudi").
-  load(basePath + "/*/*/*/*")
+  load(basePath)
 //load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
 tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
 
@@ -216,7 +421,37 @@ spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where
 spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
 ```
 
+**Time Travel Query**
+
+Hudi support time travel query since 0.9.0. Currently three query time format are supported:
+```scala
+spark.read.
+  format("hudi").
+  option("as.of.instant", "20210728141108").
+  load(basePath)
+
+spark.read.
+  format("hudi").
+  option("as.of.instant", "2021-07-28 14: 11: 08").
+  load(basePath)
+
+// It is equal to "as.of.instant = 2021-07-28 00:00:00"
+spark.read.
+  format("hudi").
+  option("as.of.instant", "2021-07-28").
+  load(basePath)
+
+```
+
+
 </TabItem>
+<TabItem value="sparksql">
+
+```sql
+ select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0
+```
+</TabItem>
+
 <TabItem value="python">
 
 ```python
@@ -224,7 +459,7 @@ spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_pat
 tripsSnapshotDF = spark. \
   read. \
   format("hudi"). \
-  load(basePath + "/*/*/*/*")
+  load(basePath)
 # load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
 
 tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
@@ -237,8 +472,12 @@ spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_pat
 </Tabs>
 
 :::info
-This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
-from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Since 0.9.0 hudi has support a hudi built-in FileIndex: **HoodieFileIndex** to query hudi table,
+which has support partition prune and metatable for query. This will help improve query performance.
+It also supports non-global query path which means users can query the table by the base path without
+specify the "*" in the query path.
+This feature has enabled by default for the non-global query path. For the global query path, we will
+rollback to the old query way.
 Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
 :::
 
@@ -252,6 +491,7 @@ defaultValue="scala"
 values={[
 { label: 'Scala', value: 'scala', },
 { label: 'Python', value: 'python', },
+{ label: 'SparkSQL', value: 'sparksql', },
 ]}>
 <TabItem value="scala">
 
@@ -270,6 +510,90 @@ df.write.format("hudi").
 ```
 
 </TabItem>
+<TabItem value="sparksql">
+
+Spark sql support two kinds of DML to udpate hudi table: Merge-Into and Update.
+
+###MergeInto
+
+Hudi support merge-into for both spark 2 & spark 3.
+
+**Syntax**
+
+```sql
+MERGE INTO tableIdentifier AS target_alias
+USING (sub_query | tableIdentifier) AS source_alias
+ON <merge_condition>
+[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
+[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
+[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]
+
+<merge_condition> =A equal bool condition 
+<matched_action>  =
+  DELETE  |
+  UPDATE SET *  |
+  UPDATE SET column1 = expression1 [, column2 = expression2 ...]
+<not_matched_action>  =
+  INSERT *  |
+  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
+```
+**Case**
+```sql
+merge into h0 as target
+using (
+  select id, name, price, flag from s
+) source
+on target.id = source.id
+when matched then update set *
+when not matched then insert *
+;
+
+merge into h0
+using (
+  select id, name, price, flag from s
+) source
+on h0.id = source.id
+when matched and flag != 'delete' then update set id = source.id, name = source.name, price = source.price * 2
+when matched and flag = 'delete' then delete
+when not matched then insert (id,name,price) values(id, name, price)
+;
+```
+**Notice**
+
+1、The merge-on condition must be the primary keys currently.
+2、Merge-On-Read table has not support partial update.
+e.g.
+```sql
+ merge into h0 using s0
+ on h0.id = s0.id
+ when matched then update set price = s0.price * 2
+```
+This works well for Cow-On-Write table which support update only the **price** field. 
+For Merge-ON-READ table this will be supported in the future.
+
+3、Target table's fields cannot be the right-value of the update expression for Merge-On-Read table.
+e.g.
+```sql
+ merge into h0 using s0
+ on h0.id = s0.id
+ when matched then update set id = s0.id, 
+                   name = h0.name,
+                   price = s0.price + h0.price
+```
+This can work well for Cow-On-Write table,  for Merge-ON-READ table this will be supported in the future.
+
+### Update
+**Syntax**
+```sql
+ UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
+```
+**Case**
+```sql
+ update h0 set price = price + 20 where id = 1;
+ update h0 set price = price *2, name = 'a2' where id = 2;
+```
+
+</TabItem>
 <TabItem value="python">
 
 ```python
@@ -311,7 +635,7 @@ values={[
 spark.
   read.
   format("hudi").
-  load(basePath + "/*/*/*/*").
+  load(basePath).
   createOrReplaceTempView("hudi_trips_snapshot")
 
 val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
@@ -336,7 +660,7 @@ spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hu
 spark. \
   read. \
   format("hudi"). \
-  load(basePath + "/*/*/*/*"). \
+  load(basePath). \
   createOrReplaceTempView("hudi_trips_snapshot")
 
 commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
@@ -426,6 +750,7 @@ defaultValue="scala"
 values={[
 { label: 'Scala', value: 'scala', },
 { label: 'Python', value: 'python', },
+{ label: 'SparkSQL', value: 'sparksql', },
 ]}>
 <TabItem value="scala">
 
@@ -454,7 +779,7 @@ df.write.format("hudi").
 val roAfterDeleteViewDF = spark.
   read.
   format("hudi").
-  load(basePath + "/*/*/*/*")
+  load(basePath)
 
 roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
 // fetch should return (total - 2) records
@@ -462,6 +787,18 @@ spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
 ```
 
 </TabItem>
+<TabItem value="sparksql">
+
+**Syntax**
+```sql
+ DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
+```
+**Case**
+```sql
+delete from h0 where id = 1;
+```
+
+</TabItem>
 <TabItem value="python">
 
 ```python
@@ -495,7 +832,7 @@ df.write.format("hudi"). \
 roAfterDeleteViewDF = spark. \
   read. \
   format("hudi"). \
-  load(basePath + "/*/*/*/*") 
+  load(basePath) 
 roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
 # fetch should return (total - 2) records
 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
@@ -526,7 +863,7 @@ in `Overwrite` mode.
 // spark-shell
 spark.
   read.format("hudi").
-  load(basePath + "/*/*/*/*").
+  load(basePath).
   select("uuid","partitionpath").
   show(10, false)
 
@@ -545,12 +882,20 @@ df.write.format("hudi").
 // Should have different keys now, from query before.
 spark.
   read.format("hudi").
-  load(basePath + "/*/*/*/*").
+  load(basePath).
   select("uuid","partitionpath").
   show(10, false)
 
 ``` 
 
+**NOTICE**
+
+The insert overwrite non-partitioned table sql statement will convert to the ***insert_overwrite_table*** operation.
+e.g.
+```sql
+insert overwrite table h0 select 1, 'a1', 20;
+```
+
 ## Insert Overwrite 
 
 Generate some new trips, overwrite the all the partitions that are present in the input. This operation can be faster
@@ -562,7 +907,7 @@ steps in the upsert write path completely.
 // spark-shell
 spark.
   read.format("hudi").
-  load(basePath + "/*/*/*/*").
+  load(basePath).
   select("uuid","partitionpath").
   sort("partitionpath","uuid").
   show(100, false)
@@ -584,11 +929,86 @@ df.write.format("hudi").
 // Should have different keys now for San Francisco alone, from query before.
 spark.
   read.format("hudi").
-  load(basePath + "/*/*/*/*").
+  load(basePath).
   select("uuid","partitionpath").
   sort("partitionpath","uuid").
   show(100, false)
 ```
+**NOTICE**
+
+The insert overwrite partitioned table sql statement will convert to the ***insert_overwrite*** operation.
+e.g.
+```sql
+insert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh;
+```
+## Other Spark Sql Command
+
+### AlterTable
+**Syntx**
+```sql
+-- Alter table name
+ALTER TABLE oldTableName RENAME TO newTableName
+
+-- Alter table add columns
+ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)
+
+-- Alter table column type
+ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType
+```
+**Case**
+```sql
+alter table h0 rename to h0_1;
+
+alter table h0_1 add columns(ext0 string);
+
+alter tablee h0_1 change column id id bigint;
+```
+
+## Set hudi config
+### Use set command
+You can use the **set** command to set the hudi's config, which will work for the
+whole spark session scope.
+```sql
+set hoodie.insert.shuffle.parallelism = 100;
+set hoodie.upsert.shuffle.parallelism = 100;
+set hoodie.delete.shuffle.parallelism = 100;
+```
+
+### Set with table options
+You can also set the config in the table's options when creating table which will work for
+the table scope only and override the config set by the SET command.
+```sql
+create table if not exists h3(
+  id bigint, 
+  name string, 
+  price double
+) using hudi
+options (
+  primaryKey = 'id',
+  type = 'mor',
+  ${hoodie.config.key1} = '${hoodie.config.value2}',
+  ${hoodie.config.key2} = '${hoodie.config.value2}',
+  ....
+);
+e.g.
+create table if not exists h3(
+  id bigint, 
+  name string, 
+  price double
+) using hudi
+options (
+  primaryKey = 'id',
+  type = 'mor',
+  hoodie.cleaner.fileversions.retained = '20',
+  hoodie.keep.max.commits = '20'
+);
+```
+You can alter the config in the table options by the **ALTER SERDEPROPERTIES**
+
+e.g.
+```sql
+ alter table h3 set serdeproperties (hoodie.keep.max.commits = '10') 
+```
 
 ## Where to go from here?