You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2020/07/02 11:01:42 UTC

[hudi] branch asf-site updated: [MINOR] Add documentation for using multi-column table keys and for not partitioning tables (#1761)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new a7a9281  [MINOR] Add documentation for using multi-column table keys and for not partitioning tables (#1761)
a7a9281 is described below

commit a7a92815d1ce83e77d13771b2174c5ec7d8558d8
Author: Adam <af...@users.noreply.github.com>
AuthorDate: Thu Jul 2 07:01:29 2020 -0400

    [MINOR] Add documentation for using multi-column table keys and for not partitioning tables (#1761)
---
 docs/_docs/1_1_quick_start_guide.md | 50 +++++++++++++++++------------
 docs/_docs/2_2_writing_data.md      | 64 ++++++++++++++++++++++++++++++-------
 docs/_docs/2_3_querying_data.md     | 11 +++++++
 3 files changed, 93 insertions(+), 32 deletions(-)

diff --git a/docs/_docs/1_1_quick_start_guide.md b/docs/_docs/1_1_quick_start_guide.md
index e5bfaa9..c0894b8 100644
--- a/docs/_docs/1_1_quick_start_guide.md
+++ b/docs/_docs/1_1_quick_start_guide.md
@@ -186,34 +186,43 @@ Delete records for the HoodieKeys passed in.
 ```scala
 // spark-shell
 // fetch total records count
-spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
 // fetch two records to be deleted
-val ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
+val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
 
 // issue deletes
 val deletes = dataGen.generateDeletes(ds.collectAsList())
-val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
-df.write.format("hudi").
-  options(getQuickstartWriteConfigs).
-  option(OPERATION_OPT_KEY,"delete").
-  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
-  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
-  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
-  option(TABLE_NAME, tableName).
-  mode(Append).
-  save(basePath)
+val df = spark
+  .read
+  .json(spark.sparkContext.parallelize(deletes, 2))
+
+df
+  .write
+  .format("hudi")
+  .options(getQuickstartWriteConfigs)
+  .option(OPERATION_OPT_KEY,"delete")
+  .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
+  .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
+  .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
+  .option(TABLE_NAME, tableName)
+  .mode(Append)
+  .save(basePath)
 
 // run the same read query as above.
-val roAfterDeleteViewDF = spark.
-  read.
-  format("hudi").
-  load(basePath + "/*/*/*/*")
+val roAfterDeleteViewDF = spark
+  .read
+  .format("hudi")
+  .load(basePath + "/*/*/*/*")
+
 roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
 // fetch should return (total - 2) records
-spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+See the [deletion section](/docs/writing_data.html#deletes) of the writing data page for more details.
+
+
 # Pyspark example
 ## Setup
 
@@ -400,9 +409,9 @@ Note: Only `Append` mode is supported for delete operation.
 ```python
 # pyspark
 # fetch total records count
-spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
 # fetch two records to be deleted
-ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
+ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
 
 # issue deletes
 hudi_delete_options = {
@@ -431,9 +440,10 @@ roAfterDeleteViewDF = spark. \
   load(basePath + "/*/*/*/*") 
 roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
 # fetch should return (total - 2) records
-spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
 ```
 
+See the [deletion section](/docs/writing_data.html#deletes) of the writing data page for more details.
 
 
 ## Where to go from here?
diff --git a/docs/_docs/2_2_writing_data.md b/docs/_docs/2_2_writing_data.md
index 52ba503..6962563 100644
--- a/docs/_docs/2_2_writing_data.md
+++ b/docs/_docs/2_2_writing_data.md
@@ -17,8 +17,8 @@ Before that, it may be helpful to understand the 3 different write operations pr
 can be chosen/changed across each commit/deltacommit issued against the table.
 
 
- - **UPSERT** : This is the default operation where the input records are first tagged as inserts or updates by looking up the index and 
- the records are ultimately written after heuristics are run to determine how best to pack them on storage to optimize for things like file sizing. 
+ - **UPSERT** : This is the default operation where the input records are first tagged as inserts or updates by looking up the index. 
+ The records are ultimately written after heuristics are run to determine how best to pack them on storage to optimize for things like file sizing. 
  This operation is recommended for use-cases like database change capture where the input almost certainly contains updates.
  - **INSERT** : This operation is very similar to upsert in terms of heuristics/file sizing but completely skips the index lookup step. Thus, it can be a lot faster than upserts 
  for use-cases like log de-duplication (in conjunction with options to filter duplicates mentioned below). This is also suitable for use-cases where the table can tolerate duplicates, but just 
@@ -176,15 +176,49 @@ In some cases, you may want to migrate your existing table into Hudi beforehand.
 
 ## Datasource Writer
 
-The `hudi-spark` module offers the DataSource API to write (and also read) any data frame into a Hudi table.
-Following is how we can upsert a dataframe, while specifying the field names that need to be used
-for `recordKey => _row_key`, `partitionPath => partition` and `precombineKey => timestamp`
+The `hudi-spark` module offers the DataSource API to write (and read) a Spark DataFrame into a Hudi table. There are a number of options available:
 
+**`HoodieWriteConfig`**:
+
+**TABLE_NAME** (Required)<br>
+
+
+**`DataSourceWriteOptions`**:
+
+**RECORDKEY_FIELD_OPT_KEY** (Required): Primary key field(s). Nested fields can be specified using the dot notation eg: `a.b.c`. When using multiple columns as primary key use comma separated notation, eg: `"col1,col2,col3,etc"`. Single or multiple columns as primary key specified by `KEYGENERATOR_CLASS_OPT_KEY` property.<br>
+Default value: `"uuid"`<br>
+
+**PARTITIONPATH_FIELD_OPT_KEY** (Required): Columns to be used for partitioning the table. To prevent partitioning, provide empty string as value eg: `""`. Specify partitioning/no partitioning using `KEYGENERATOR_CLASS_OPT_KEY`. If synchronizing to hive, also specify using `HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.`<br>
+Default value: `"partitionpath"`<br>
+
+**PRECOMBINE_FIELD_OPT_KEY** (Required): When two records have the same key value, the record with the largest value from the field specified will be choosen.<br>
+Default value: `"ts"`<br>
+
+**OPERATION_OPT_KEY**: The [write operations](#write-operations) to use.<br>
+Available values:<br>
+`UPSERT_OPERATION_OPT_VAL` (default), `BULK_INSERT_OPERATION_OPT_VAL`, `INSERT_OPERATION_OPT_VAL`, `DELETE_OPERATION_OPT_VAL`
+
+**TABLE_TYPE_OPT_KEY**: The [type of table](/docs/concepts.html#table-types) to write to. Note: After the initial creation of a table, this value must stay consistent when writing to (updating) the table using the Spark `SaveMode.Append` mode.<br>
+Available values:<br>
+[`COW_TABLE_TYPE_OPT_VAL`](/docs/concepts.html#copy-on-write-table) (default), [`MOR_TABLE_TYPE_OPT_VAL`](/docs/concepts.html#merge-on-read-table)
+
+**KEYGENERATOR_CLASS_OPT_KEY**: Key generator class, that will extract the key out of incoming record. If single column key use `SimpleKeyGenerator`. For multiple column keys use `ComplexKeyGenerator`. Note: A custom key generator class can be written/provided here as well. Primary key columns should be provided via `RECORDKEY_FIELD_OPT_KEY` option.<br>
+Available values:<br>
+`classOf[SimpleKeyGenerator].getName` (default), `classOf[NonpartitionedKeyGenerator].getName` (Non-partitioned tables can currently only have a single key column, [HUDI-1053](https://issues.apache.org/jira/browse/HUDI-1053)), `classOf[ComplexKeyGenerator].getName`
+
+
+**HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY**: If using hive, specify if the table should or should not be partitioned.<br>
+Available values:<br>
+`classOf[SlashEncodedDayPartitionValueExtractor].getCanonicalName` (default), `classOf[MultiPartKeysValueExtractor].getCanonicalName`, `classOf[TimestampBasedKeyGenerator].getCanonicalName`, `classOf[NonPartitionedExtractor].getCanonicalName`, `classOf[GlobalDeleteKeyGenerator].getCanonicalName` (to be used when `OPERATION_OPT_KEY` is set to `DELETE_OPERATION_OPT_VAL`)
+
+
+Example:
+Upsert a DataFrame, specifying the necessary field names for `recordKey => _row_key`, `partitionPath => partition`, and `precombineKey => timestamp`
 
 ```java
 inputDF.write()
        .format("org.apache.hudi")
-       .options(clientOpts) // any of the Hudi client opts can be passed in as well
+       .options(clientOpts) //Where clientOpts is of type Map[String, String]. clientOpts can include any other options necessary.
        .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
        .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
        .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
@@ -204,8 +238,7 @@ cd hudi-hive
 ./run_sync_tool.sh  --jdbc-url jdbc:hive2:\/\/hiveserver:10000 --user hive --pass hive --partitioned-by partition --base-path <basePath> --database default --table <tableName>
 ```
 
-Starting with Hudi 0.5.1 version read optimized version of merge-on-read tables are suffixed '_ro' by default. For backwards compatibility with older Hudi versions, 
-an optional HiveSyncConfig - `--skip-ro-suffix`, has been provided to turn off '_ro' suffixing if desired. Explore other hive sync options using the following command:
+Starting with Hudi 0.5.1 version read optimized version of merge-on-read tables are suffixed '_ro' by default. For backwards compatibility with older Hudi versions, an optional HiveSyncConfig - `--skip-ro-suffix`, has been provided to turn off '_ro' suffixing if desired. Explore other hive sync options using the following command:
 
 ```java
 cd hudi-hive
@@ -218,11 +251,18 @@ cd hudi-hive
 Hudi supports implementing two types of deletes on data stored in Hudi tables, by enabling the user to specify a different record payload implementation. 
 For more info refer to [Delete support in Hudi](https://cwiki.apache.org/confluence/x/6IqvC).
 
- - **Soft Deletes** : With soft deletes, user wants to retain the key but just null out the values for all other fields. 
- This can be simply achieved by ensuring the appropriate fields are nullable in the table schema and simply upserting the table after setting these fields to null.
- - **Hard Deletes** : A stronger form of delete is to physically remove any trace of the record from the table. This can be achieved by issuing an upsert with a custom payload implementation
- via either DataSource or DeltaStreamer which always returns Optional.Empty as the combined value. Hudi ships with a built-in `org.apache.hudi.EmptyHoodieRecordPayload` class that does exactly this.
+ - **Soft Deletes** : Retain the record key and just null out the values for all the other fields. 
+ This can be achieved by ensuring the appropriate fields are nullable in the table schema and simply upserting the table after setting these fields to null.
  
+ - **Hard Deletes** : A stronger form of deletion is to physically remove any trace of the record from the table. This can be achieved in 3 different ways.
+
+   1) Using DataSource, set `OPERATION_OPT_KEY` to `DELETE_OPERATION_OPT_VAL`. This will remove all the records in the DataSet being submitted.
+   
+   2) Using DataSource, set `PAYLOAD_CLASS_OPT_KEY` to `"org.apache.hudi.EmptyHoodieRecordPayload"`. This will remove all the records in the DataSet being submitted. 
+   
+   3) Using DataSource or DeltaStreamer, add a column named `_hoodie_is_deleted` to DataSet. The value of this column must be set to `true` for all the records to be deleted and either `false` or left null for any records which are to be upserted.
+    
+Example using hard delete method 2, remove all the records from the table that exist in the DataSet `deleteDF`:
 ```java
  deleteDF // dataframe containing just records to be deleted
    .write().format("org.apache.hudi")
diff --git a/docs/_docs/2_3_querying_data.md b/docs/_docs/2_3_querying_data.md
index b37fa12..4b62a3d 100644
--- a/docs/_docs/2_3_querying_data.md
+++ b/docs/_docs/2_3_querying_data.md
@@ -136,6 +136,17 @@ The Spark Datasource API is a popular way of authoring Spark ETL pipelines. Hudi
 datasources work (e.g: `spark.read.parquet`). Both snapshot querying and incremental querying are supported here. Typically spark jobs require adding `--jars <path to jar>/hudi-spark-bundle_2.11-<hudi version>.jar` to classpath of drivers 
 and executors. Alternatively, hudi-spark-bundle can also fetched via the `--packages` options (e.g: `--packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.3`).
 
+### Snapshot query {#spark-snap-query}
+This method can be used to retrieve the data table at the present point in time.
+Note: The file path must be suffixed with a number of wildcard asterisk (`/*`) one greater than the number of partition levels. Eg: with table file path "tablePath" partitioned by columns "a", "b", and "c", the load path must be `tablePath + "/*/*/*/*"`
+
+```scala
+val hudiIncQueryDF = spark
+     .read()
+     .format("org.apache.hudi")
+     .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
+     .load(tablePath + "/*") //The number of wildcard asterisks here must be one greater than the number of partition
+```
 
 ### Incremental query {#spark-incr-query}
 Of special interest to spark pipelines, is Hudi's ability to support incremental queries, like below. A sample incremental query, that will obtain all records written since `beginInstantTime`, looks like below.