You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/15 23:40:18 UTC
[hudi] branch asf-site updated: [HUDI-4759] Update website with validations (#6683)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new b5a2645c08 [HUDI-4759] Update website with validations (#6683)
b5a2645c08 is described below
commit b5a2645c0840bea97c06db2ca1cc4a15ea7a33f6
Author: Jon Vexler <jo...@onehouse.ai>
AuthorDate: Thu Sep 15 19:40:12 2022 -0400
[HUDI-4759] Update website with validations (#6683)
---
website/docs/quick-start-guide.md | 87 ++++++++++++++++++++--
.../version-0.12.0/quick-start-guide.md | 87 ++++++++++++++++++++--
2 files changed, 160 insertions(+), 14 deletions(-)
diff --git a/website/docs/quick-start-guide.md b/website/docs/quick-start-guide.md
index 5e6e4e6810..4beb9577c5 100644
--- a/website/docs/quick-start-guide.md
+++ b/website/docs/quick-start-guide.md
@@ -189,6 +189,7 @@ import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
+val snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"
```
</TabItem>
@@ -199,6 +200,7 @@ val dataGen = new DataGenerator
tableName = "hudi_trips_cow"
basePath = "file:///tmp/hudi_trips_cow"
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"
```
</TabItem>
@@ -427,6 +429,9 @@ df.write.format("hudi").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
+
+// validations
+assert(df.except(spark.sql(snapshotQuery)).count() == 0)
```
:::info
`mode(Overwrite)` overwrites and recreates the table if it already exists.
@@ -459,10 +464,13 @@ hudi_options = {
'hoodie.insert.shuffle.parallelism': 2
}
-df.write.format("hudi").
- options(**hudi_options).
- mode("overwrite").
+df.write.format("hudi"). \
+ options(**hudi_options). \
+ mode("overwrite"). \
save(basePath)
+
+# validations
+assert spark.sql(snapshotQuery).exceptAll(df).count() == 0
```
:::info
`mode(Overwrite)` overwrites and recreates the table if it already exists.
@@ -638,7 +646,7 @@ spark.read. \
option("as.of.instant", "2021-07-28 14: 11: 08"). \
load(basePath)
-// It is equal to "as.of.instant = 2021-07-28 00:00:00"
+# It is equal to "as.of.instant = 2021-07-28 00:00:00"
spark.read. \
format("hudi"). \
option("as.of.instant", "2021-07-28"). \
@@ -705,6 +713,7 @@ values={[
```scala
// spark-shell
+val snapBeforeUpdate = spark.sql(snapshotQuery)
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
@@ -715,6 +724,10 @@ df.write.format("hudi").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
+
+// validations
+assert(spark.sql(snapshotQuery).intersect(df).count() == df.count())
+assert(spark.sql(snapshotQuery).except(df).except(snapBeforeUpdate).count() == 0)
```
:::note
Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
@@ -803,12 +816,17 @@ when not matched then
```python
# pyspark
+snapshotBeforeUpdate = spark.sql(snapshotQuery)
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
+
+# validations
+assert spark.sql(snapshotQuery).intersect(df).count() == df.count()
+assert spark.sql(snapshotQuery).exceptAll(snapshotBeforeUpdate).exceptAll(df).count() == 0
```
:::note
Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
@@ -1047,7 +1065,7 @@ meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_k
"_hoodie_partition_path", "_hoodie_file_name"]
excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
- list(map(lambda field: (field.name, field.dataType), softDeleteDs.schema.fields))))
+ list(map(lambda field: (field.name, field.dataType), soft_delete_ds.schema.fields))))
hudi_soft_delete_options = {
'hoodie.table.name': tableName,
@@ -1104,6 +1122,7 @@ Delete records for the HoodieKeys passed in.<br/>
```scala
// spark-shell
+val snapshotBeforeDelete = spark.sql(snapshotQuery)
// fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// fetch two records to be deleted
@@ -1115,7 +1134,7 @@ val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
- option(OPERATION_OPT_KEY,"delete").
+ option(OPERATION_OPT_KEY, "delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
@@ -1132,6 +1151,10 @@ val roAfterDeleteViewDF = spark.
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+
+// validations
+assert(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot").intersect(hardDeleteDf).count() == 0)
+assert(snapshotBeforeDelete.except(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot")).except(snapshotBeforeDelete).count() == 0)
```
:::note
Only `Append` mode is supported for delete operation.
@@ -1159,6 +1182,7 @@ Delete records for the HoodieKeys passed in.<br/>
```python
# pyspark
+snapshotBeforeDelete = spark.sql(snapshotQuery)
# fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
# fetch two records to be deleted
@@ -1189,9 +1213,13 @@ roAfterDeleteViewDF = spark. \
read. \
format("hudi"). \
load(basePath)
-roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
+roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot")
# fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+
+# validations
+assert spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot").intersect(hard_delete_df).count() == 0
+assert snapshotBeforeDelete.excptAll(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot")).count() == 0
```
:::note
Only `Append` mode is supported for delete operation.
@@ -1212,6 +1240,7 @@ steps in the upsert write path completely.
defaultValue="scala"
values={[
{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
{ label: 'Spark SQL', value: 'sparksql', },
]}
>
@@ -1227,6 +1256,7 @@ spark.
sort("partitionpath","uuid").
show(100, false)
+val snapshotBeforeOverwrite = spark.sql(snapshotQuery)
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
@@ -1248,6 +1278,49 @@ spark.
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
+
+// validations
+val withoutSanFran = snapshotBeforeOverwrite.filter("partitionpath != 'americas/united_states/san_francisco'")
+val expectedDf = withoutSanFran.union(df)
+assert(spark.sql(snapshotQuery).except(expectedDf).count() == 0)
+```
+</TabItem>
+
+<TabItem value="python">
+
+```python
+# pyspark
+snapshotBeforeOverwrite = spark.sql(snapshotQuery)
+self.spark.read.format("hudi"). \
+ load(basePath). \
+ select(["uuid", "partitionpath"]). \
+ sort(["partitionpath", "uuid"]). \
+ show(n=100, truncate=False) \
+
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)). \
+ filter("partitionpath = 'americas/united_states/san_francisco'")
+hudi_insert_overwrite_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'insert_overwrite',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+df.write.format("hudi").options(**hudi_insert_overwrite_options).mode("append").save(basePath)
+spark.read.format("hudi"). \
+ load(basePath). \
+ select(["uuid", "partitionpath"]). \
+ sort(["partitionpath", "uuid"]). \
+ show(n=100, truncate=False)
+
+# validations
+withoutSanFran = snapshotBeforeOverwrite.filter("partitionpath != 'americas/united_states/san_francisco'")
+expectedDf = withoutSanFran.union(df)
+assert spark.sql(snapshotQuery).exceptAll(expectedDf).count() == 0
```
</TabItem>
diff --git a/website/versioned_docs/version-0.12.0/quick-start-guide.md b/website/versioned_docs/version-0.12.0/quick-start-guide.md
index 494be9858d..2610a88f30 100644
--- a/website/versioned_docs/version-0.12.0/quick-start-guide.md
+++ b/website/versioned_docs/version-0.12.0/quick-start-guide.md
@@ -189,6 +189,7 @@ import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
+val snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"
```
</TabItem>
@@ -199,6 +200,7 @@ val dataGen = new DataGenerator
tableName = "hudi_trips_cow"
basePath = "file:///tmp/hudi_trips_cow"
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"
```
</TabItem>
@@ -427,6 +429,9 @@ df.write.format("hudi").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
+
+// validations
+assert(df.except(spark.sql(snapshotQuery)).count() == 0)
```
:::info
`mode(Overwrite)` overwrites and recreates the table if it already exists.
@@ -459,10 +464,13 @@ hudi_options = {
'hoodie.insert.shuffle.parallelism': 2
}
-df.write.format("hudi").
- options(**hudi_options).
- mode("overwrite").
+df.write.format("hudi"). \
+ options(**hudi_options). \
+ mode("overwrite"). \
save(basePath)
+
+# validations
+assert spark.sql(snapshotQuery).exceptAll(df).count() == 0
```
:::info
`mode(Overwrite)` overwrites and recreates the table if it already exists.
@@ -638,7 +646,7 @@ spark.read. \
option("as.of.instant", "2021-07-28 14: 11: 08"). \
load(basePath)
-// It is equal to "as.of.instant = 2021-07-28 00:00:00"
+# It is equal to "as.of.instant = 2021-07-28 00:00:00"
spark.read. \
format("hudi"). \
option("as.of.instant", "2021-07-28"). \
@@ -705,6 +713,7 @@ values={[
```scala
// spark-shell
+val snapBeforeUpdate = spark.sql(snapshotQuery)
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
@@ -715,6 +724,10 @@ df.write.format("hudi").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
+
+// validations
+assert(spark.sql(snapshotQuery).intersect(df).count() == df.count())
+assert(spark.sql(snapshotQuery).except(df).except(snapBeforeUpdate).count() == 0)
```
:::note
Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
@@ -803,12 +816,17 @@ when not matched then
```python
# pyspark
+snapshotBeforeUpdate = spark.sql(snapshotQuery)
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
+
+# validations
+assert spark.sql(snapshotQuery).intersect(df).count() == df.count()
+assert spark.sql(snapshotQuery).exceptAll(snapshotBeforeUpdate).exceptAll(df).count() == 0
```
:::note
Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
@@ -1047,7 +1065,7 @@ meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_k
"_hoodie_partition_path", "_hoodie_file_name"]
excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
- list(map(lambda field: (field.name, field.dataType), softDeleteDs.schema.fields))))
+ list(map(lambda field: (field.name, field.dataType), soft_delete_ds.schema.fields))))
hudi_soft_delete_options = {
'hoodie.table.name': tableName,
@@ -1104,6 +1122,7 @@ Delete records for the HoodieKeys passed in.<br/>
```scala
// spark-shell
+val snapshotBeforeDelete = spark.sql(snapshotQuery)
// fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// fetch two records to be deleted
@@ -1115,7 +1134,7 @@ val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
- option(OPERATION_OPT_KEY,"delete").
+ option(OPERATION_OPT_KEY, "delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
@@ -1132,6 +1151,10 @@ val roAfterDeleteViewDF = spark.
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+
+// validations
+assert(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot").intersect(hardDeleteDf).count() == 0)
+assert(snapshotBeforeDelete.except(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot")).except(snapshotBeforeDelete).count() == 0)
```
:::note
Only `Append` mode is supported for delete operation.
@@ -1159,6 +1182,7 @@ Delete records for the HoodieKeys passed in.<br/>
```python
# pyspark
+snapshotBeforeDelete = spark.sql(snapshotQuery)
# fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
# fetch two records to be deleted
@@ -1189,9 +1213,13 @@ roAfterDeleteViewDF = spark. \
read. \
format("hudi"). \
load(basePath)
-roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
+roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot")
# fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+
+# validations
+assert spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot").intersect(hard_delete_df).count() == 0
+assert snapshotBeforeDelete.excptAll(spark.sql("select uuid, partitionpath, ts from hudi_trips_snapshot")).count() == 0
```
:::note
Only `Append` mode is supported for delete operation.
@@ -1212,6 +1240,7 @@ steps in the upsert write path completely.
defaultValue="scala"
values={[
{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
{ label: 'Spark SQL', value: 'sparksql', },
]}
>
@@ -1227,6 +1256,7 @@ spark.
sort("partitionpath","uuid").
show(100, false)
+val snapshotBeforeOverwrite = spark.sql(snapshotQuery)
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
@@ -1248,6 +1278,49 @@ spark.
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
+
+// validations
+val withoutSanFran = snapshotBeforeOverwrite.filter("partitionpath != 'americas/united_states/san_francisco'")
+val expectedDf = withoutSanFran.union(df)
+assert(spark.sql(snapshotQuery).except(expectedDf).count() == 0)
+```
+</TabItem>
+
+<TabItem value="python">
+
+```python
+# pyspark
+snapshotBeforeOverwrite = spark.sql(snapshotQuery)
+self.spark.read.format("hudi"). \
+ load(basePath). \
+ select(["uuid", "partitionpath"]). \
+ sort(["partitionpath", "uuid"]). \
+ show(n=100, truncate=False) \
+
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)). \
+ filter("partitionpath = 'americas/united_states/san_francisco'")
+hudi_insert_overwrite_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'insert_overwrite',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+df.write.format("hudi").options(**hudi_insert_overwrite_options).mode("append").save(basePath)
+spark.read.format("hudi"). \
+ load(basePath). \
+ select(["uuid", "partitionpath"]). \
+ sort(["partitionpath", "uuid"]). \
+ show(n=100, truncate=False)
+
+# validations
+withoutSanFran = snapshotBeforeOverwrite.filter("partitionpath != 'americas/united_states/san_francisco'")
+expectedDf = withoutSanFran.union(df)
+assert spark.sql(snapshotQuery).exceptAll(expectedDf).count() == 0
```
</TabItem>