You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/15 23:40:18 UTC

[hudi] branch asf-site updated: [HUDI-4759] Update website with validations (#6683)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new b5a2645c08 [HUDI-4759] Update website with validations (#6683)
b5a2645c08 is described below

commit b5a2645c0840bea97c06db2ca1cc4a15ea7a33f6
Author: Jon Vexler <jo...@onehouse.ai>
AuthorDate: Thu Sep 15 19:40:12 2022 -0400

    [HUDI-4759] Update website with validations (#6683)
---
 website/docs/quick-start-guide.md                  | 87 ++++++++++++++++++++--
 .../version-0.12.0/quick-start-guide.md            | 87 ++++++++++++++++++++--
 2 files changed, 160 insertions(+), 14 deletions(-)

diff --git a/website/docs/quick-start-guide.md b/website/docs/quick-start-guide.md
index 5e6e4e6810..4beb9577c5 100644
--- a/website/docs/quick-start-guide.md
+++ b/website/docs/quick-start-guide.md
@@ -189,6 +189,7 @@ import org.apache.hudi.common.model.HoodieRecord
 val tableName = "hudi_trips_cow"
 val basePath = "file:///tmp/hudi_trips_cow"
 val dataGen = new DataGenerator
+val snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"
 ```
 
 </TabItem>
@@ -199,6 +200,7 @@ val dataGen = new DataGenerator
 tableName = "hudi_trips_cow"
 basePath = "file:///tmp/hudi_trips_cow"
 dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"
 ```
 
 </TabItem>
@@ -427,6 +429,9 @@ df.write.format("hudi").
   option(TABLE_NAME, tableName).
   mode(Overwrite).
   save(basePath)
+  
+// validations
+assert(df.except(spark.sql(snapshotQuery)).count() == 0)
 ```
 :::info
 `mode(Overwrite)` overwrites and recreates the table if it already exists.
@@ -459,10 +464,13 @@ hudi_options = {
     'hoodie.insert.shuffle.parallelism': 2
 }
 
-df.write.format("hudi").
-    options(**hudi_options).
-    mode("overwrite").
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
     save(basePath)
+    
+# validations
+assert spark.sql(snapshotQuery).exceptAll(df).count() == 0
 ```
 :::info
 `mode(Overwrite)` overwrites and recreates the table if it already exists.
@@ -638,7 +646,7 @@ spark.read. \
   option("as.of.instant", "2021-07-28 14: 11: 08"). \
   load(basePath)
 
-// It is equal to "as.of.instant = 2021-07-28 00:00:00"
+# It is equal to "as.of.instant = 2021-07-28 00:00:00"
 spark.read. \
   format("hudi"). \
   option("as.of.instant", "2021-07-28"). \
@@ -705,6 +713,7 @@ values={[
 
 ```scala
 // spark-shell
+val snapBeforeUpdate = spark.sql(snapshotQuery)
 val updates = convertToStringList(dataGen.generateUpdates(10))
 val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
 df.write.format("hudi").
@@ -715,6 +724,10 @@ df.write.format("hudi").
   option(TABLE_NAME, tableName).
   mode(Append).
   save(basePath)
+  
+// validations
+assert(spark.sql(snapshotQuery).intersect(df).count() == df.count())
+assert(spark.sql(snapshotQuery).except(df).except(snapBeforeUpdate).count() == 0)
 ```
 :::note
 Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
@@ -803,12 +816,17 @@ when not matched then
 
 ```python
 # pyspark
+snapshotBeforeUpdate = spark.sql(snapshotQuery)
 updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
 df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
 df.write.format("hudi"). \
   options(**hudi_options). \
   mode("append"). \
   save(basePath)
+  
+# validations
+assert spark.sql(snapshotQuery).intersect(df).count() == df.count()
+assert spark.sql(snapshotQuery).exceptAll(snapshotBeforeUpdate).exceptAll(df).count() == 0
 ```
 :::note
 Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
@@ -1047,7 +1065,7 @@ meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_k
   "_hoodie_partition_path", "_hoodie_file_name"]
 excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
 nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
-  list(map(lambda field: (field.name, field.dataType), softDeleteDs.schema.fields))))
+  list(map(lambda field: (field.name, field.dataType), soft_delete_ds.schema.fields))))
 
 hudi_soft_delete_options = {
   'hoodie.table.name': tableName,
@@ -1104,6 +1122,7 @@ Delete records for the HoodieKeys passed in.<br/>
 
 ```scala
 // spark-shell
+val snapshotBeforeDelete = spark.sql(snapshotQuery)
 // fetch total records count
 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
 // fetch two records to be deleted
@@ -1115,7 +1134,7 @@ val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
 
 hardDeleteDf.write.format("hudi").
   options(getQuickstartWriteConfigs).
-  option(OPERATION_OPT_KEY,"delete").
+  option(OPERATION_OPT_KEY, "delete").
   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
@@ -1132,6 +1151,10 @@ val roAfterDeleteViewDF = spark.
 roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
 // fetch should return (total - 2) records
 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+
+// validations
+assert(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot").intersect(hardDeleteDf).count() == 0)
+assert(snapshotBeforeDelete.except(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot")).except(snapshotBeforeDelete).count() == 0)
 ```
 :::note
 Only `Append` mode is supported for delete operation.
@@ -1159,6 +1182,7 @@ Delete records for the HoodieKeys passed in.<br/>
 
 ```python
 # pyspark
+snapshotBeforeDelete = spark.sql(snapshotQuery)
 # fetch total records count
 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
 # fetch two records to be deleted
@@ -1189,9 +1213,13 @@ roAfterDeleteViewDF = spark. \
   read. \
   format("hudi"). \
   load(basePath) 
-roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
+roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot")
 # fetch should return (total - 2) records
 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+
+# validations
+assert spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot").intersect(hard_delete_df).count() == 0
+assert snapshotBeforeDelete.excptAll(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot")).count() == 0
 ```
 :::note
 Only `Append` mode is supported for delete operation.
@@ -1212,6 +1240,7 @@ steps in the upsert write path completely.
 defaultValue="scala"
 values={[
 { label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
 { label: 'Spark SQL', value: 'sparksql', },
 ]}
 >
@@ -1227,6 +1256,7 @@ spark.
   sort("partitionpath","uuid").
   show(100, false)
 
+val snapshotBeforeOverwrite = spark.sql(snapshotQuery)
 val inserts = convertToStringList(dataGen.generateInserts(10))
 val df = spark.
   read.json(spark.sparkContext.parallelize(inserts, 2)).
@@ -1248,6 +1278,49 @@ spark.
   select("uuid","partitionpath").
   sort("partitionpath","uuid").
   show(100, false)
+  
+// validations
+val withoutSanFran = snapshotBeforeOverwrite.filter("partitionpath != 'americas/united_states/san_francisco'")
+val expectedDf = withoutSanFran.union(df)
+assert(spark.sql(snapshotQuery).except(expectedDf).count() == 0)
+```
+</TabItem>
+
+<TabItem value="python">
+
+```python
+# pyspark
+snapshotBeforeOverwrite = spark.sql(snapshotQuery)
+self.spark.read.format("hudi"). \
+    load(basePath). \
+    select(["uuid", "partitionpath"]). \
+    sort(["partitionpath", "uuid"]). \
+    show(n=100, truncate=False) \
+    
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) 
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)). \
+    filter("partitionpath = 'americas/united_states/san_francisco'")
+hudi_insert_overwrite_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'insert_overwrite',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+df.write.format("hudi").options(**hudi_insert_overwrite_options).mode("append").save(basePath)
+spark.read.format("hudi"). \
+    load(basePath). \
+    select(["uuid", "partitionpath"]). \
+    sort(["partitionpath", "uuid"]). \
+    show(n=100, truncate=False)
+
+# validations
+withoutSanFran = snapshotBeforeOverwrite.filter("partitionpath != 'americas/united_states/san_francisco'")
+expectedDf = withoutSanFran.union(df)
+assert spark.sql(snapshotQuery).exceptAll(expectedDf).count() == 0
 ```
 </TabItem>
 
diff --git a/website/versioned_docs/version-0.12.0/quick-start-guide.md b/website/versioned_docs/version-0.12.0/quick-start-guide.md
index 494be9858d..2610a88f30 100644
--- a/website/versioned_docs/version-0.12.0/quick-start-guide.md
+++ b/website/versioned_docs/version-0.12.0/quick-start-guide.md
@@ -189,6 +189,7 @@ import org.apache.hudi.common.model.HoodieRecord
 val tableName = "hudi_trips_cow"
 val basePath = "file:///tmp/hudi_trips_cow"
 val dataGen = new DataGenerator
+val snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"
 ```
 
 </TabItem>
@@ -199,6 +200,7 @@ val dataGen = new DataGenerator
 tableName = "hudi_trips_cow"
 basePath = "file:///tmp/hudi_trips_cow"
 dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"
 ```
 
 </TabItem>
@@ -427,6 +429,9 @@ df.write.format("hudi").
   option(TABLE_NAME, tableName).
   mode(Overwrite).
   save(basePath)
+  
+// validations
+assert(df.except(spark.sql(snapshotQuery)).count() == 0)
 ```
 :::info
 `mode(Overwrite)` overwrites and recreates the table if it already exists.
@@ -459,10 +464,13 @@ hudi_options = {
     'hoodie.insert.shuffle.parallelism': 2
 }
 
-df.write.format("hudi").
-    options(**hudi_options).
-    mode("overwrite").
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
     save(basePath)
+    
+# validations
+assert spark.sql(snapshotQuery).exceptAll(df).count() == 0
 ```
 :::info
 `mode(Overwrite)` overwrites and recreates the table if it already exists.
@@ -638,7 +646,7 @@ spark.read. \
   option("as.of.instant", "2021-07-28 14: 11: 08"). \
   load(basePath)
 
-// It is equal to "as.of.instant = 2021-07-28 00:00:00"
+# It is equal to "as.of.instant = 2021-07-28 00:00:00"
 spark.read. \
   format("hudi"). \
   option("as.of.instant", "2021-07-28"). \
@@ -705,6 +713,7 @@ values={[
 
 ```scala
 // spark-shell
+val snapBeforeUpdate = spark.sql(snapshotQuery)
 val updates = convertToStringList(dataGen.generateUpdates(10))
 val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
 df.write.format("hudi").
@@ -715,6 +724,10 @@ df.write.format("hudi").
   option(TABLE_NAME, tableName).
   mode(Append).
   save(basePath)
+  
+// validations
+assert(spark.sql(snapshotQuery).intersect(df).count() == df.count())
+assert(spark.sql(snapshotQuery).except(df).except(snapBeforeUpdate).count() == 0)
 ```
 :::note
 Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
@@ -803,12 +816,17 @@ when not matched then
 
 ```python
 # pyspark
+snapshotBeforeUpdate = spark.sql(snapshotQuery)
 updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
 df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
 df.write.format("hudi"). \
   options(**hudi_options). \
   mode("append"). \
   save(basePath)
+  
+# validations
+assert spark.sql(snapshotQuery).intersect(df).count() == df.count()
+assert spark.sql(snapshotQuery).exceptAll(snapshotBeforeUpdate).exceptAll(df).count() == 0
 ```
 :::note
 Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
@@ -1047,7 +1065,7 @@ meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_k
   "_hoodie_partition_path", "_hoodie_file_name"]
 excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
 nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
-  list(map(lambda field: (field.name, field.dataType), softDeleteDs.schema.fields))))
+  list(map(lambda field: (field.name, field.dataType), soft_delete_ds.schema.fields))))
 
 hudi_soft_delete_options = {
   'hoodie.table.name': tableName,
@@ -1104,6 +1122,7 @@ Delete records for the HoodieKeys passed in.<br/>
 
 ```scala
 // spark-shell
+val snapshotBeforeDelete = spark.sql(snapshotQuery)
 // fetch total records count
 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
 // fetch two records to be deleted
@@ -1115,7 +1134,7 @@ val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
 
 hardDeleteDf.write.format("hudi").
   options(getQuickstartWriteConfigs).
-  option(OPERATION_OPT_KEY,"delete").
+  option(OPERATION_OPT_KEY, "delete").
   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
@@ -1132,6 +1151,10 @@ val roAfterDeleteViewDF = spark.
 roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
 // fetch should return (total - 2) records
 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+
+// validations
+assert(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot").intersect(hardDeleteDf).count() == 0)
+assert(snapshotBeforeDelete.except(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot")).except(snapshotBeforeDelete).count() == 0)
 ```
 :::note
 Only `Append` mode is supported for delete operation.
@@ -1159,6 +1182,7 @@ Delete records for the HoodieKeys passed in.<br/>
 
 ```python
 # pyspark
+snapshotBeforeDelete = spark.sql(snapshotQuery)
 # fetch total records count
 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
 # fetch two records to be deleted
@@ -1189,9 +1213,13 @@ roAfterDeleteViewDF = spark. \
   read. \
   format("hudi"). \
   load(basePath) 
-roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
+roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot")
 # fetch should return (total - 2) records
 spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+
+# validations
+assert spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot").intersect(hard_delete_df).count() == 0
+assert snapshotBeforeDelete.excptAll(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot")).count() == 0
 ```
 :::note
 Only `Append` mode is supported for delete operation.
@@ -1212,6 +1240,7 @@ steps in the upsert write path completely.
 defaultValue="scala"
 values={[
 { label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
 { label: 'Spark SQL', value: 'sparksql', },
 ]}
 >
@@ -1227,6 +1256,7 @@ spark.
   sort("partitionpath","uuid").
   show(100, false)
 
+val snapshotBeforeOverwrite = spark.sql(snapshotQuery)
 val inserts = convertToStringList(dataGen.generateInserts(10))
 val df = spark.
   read.json(spark.sparkContext.parallelize(inserts, 2)).
@@ -1248,6 +1278,49 @@ spark.
   select("uuid","partitionpath").
   sort("partitionpath","uuid").
   show(100, false)
+  
+// validations
+val withoutSanFran = snapshotBeforeOverwrite.filter("partitionpath != 'americas/united_states/san_francisco'")
+val expectedDf = withoutSanFran.union(df)
+assert(spark.sql(snapshotQuery).except(expectedDf).count() == 0)
+```
+</TabItem>
+
+<TabItem value="python">
+
+```python
+# pyspark
+snapshotBeforeOverwrite = spark.sql(snapshotQuery)
+self.spark.read.format("hudi"). \
+    load(basePath). \
+    select(["uuid", "partitionpath"]). \
+    sort(["partitionpath", "uuid"]). \
+    show(n=100, truncate=False) \
+    
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) 
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)). \
+    filter("partitionpath = 'americas/united_states/san_francisco'")
+hudi_insert_overwrite_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'insert_overwrite',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+df.write.format("hudi").options(**hudi_insert_overwrite_options).mode("append").save(basePath)
+spark.read.format("hudi"). \
+    load(basePath). \
+    select(["uuid", "partitionpath"]). \
+    sort(["partitionpath", "uuid"]). \
+    show(n=100, truncate=False)
+
+# validations
+withoutSanFran = snapshotBeforeOverwrite.filter("partitionpath != 'americas/united_states/san_francisco'")
+expectedDf = withoutSanFran.union(df)
+assert spark.sql(snapshotQuery).exceptAll(expectedDf).count() == 0
 ```
 </TabItem>