You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/04/17 04:04:02 UTC

[GitHub] [incubator-hudi] EdwinGuo opened a new pull request #1526: Hudi 783

EdwinGuo opened a new pull request #1526: Hudi 783
URL: https://github.com/apache/incubator-hudi/pull/1526
 
 
   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   To add the example of hudi pyspark
   *(For example: This pull request adds quick-start document.)*
   
   ## Brief change log
   Modify 1_1_quick_start doc.
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   yes
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
   
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] lamber-ken edited a comment on issue #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
lamber-ken edited a comment on issue #1526: [HUDI-1526] Add pyspark example in quickstart
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-615497198
 
 
   hi @bhasudha, do you have any suggestions? you can visit https://lamber-ken.github.io/docs/quick-start-guide.html

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] lamber-ken commented on a change in pull request #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #1526: [HUDI-1526] Add pyspark example in quickstart
URL: https://github.com/apache/incubator-hudi/pull/1526#discussion_r410501360
 
 

 ##########
 File path: docs/_docs/1_1_quick_start_guide.md
 ##########
 @@ -68,6 +81,27 @@ df.write.format("hudi").
   save(basePath)
 ``` 
 
+{% highlight python %}
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+'hoodie.table.name': tableName,
+'hoodie.datasource.write.recordkey.field': 'uuid',
+'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+'hoodie.datasource.write.table.name': tableName,
+'hoodie.datasource.write.operation': 'insert',
+'hoodie.datasource.write.precombine.field': 'ts',
+'hoodie.upsert.shuffle.parallelism': 2, 
+'hoodie.insert.shuffle.parallelism': 2
+}
 
 Review comment:
   Add two spaces before each line.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] lamber-ken commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-615497198
 
 
   hi @bhasudha, do you have any suggestions? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] lamber-ken commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-615494434
 
 
   Thanks for your contribution, left minor comments. Visit https://lamber-ken.github.io/docs/quick-start-guide.html

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] lamber-ken commented on a change in pull request #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #1526: [HUDI-1526] Add pyspark example in quickstart
URL: https://github.com/apache/incubator-hudi/pull/1526#discussion_r410501516
 
 

 ##########
 File path: docs/_docs/1_1_quick_start_guide.md
 ##########
 @@ -148,6 +203,31 @@ tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
 spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
 ``` 
 
+{% highlight python %}
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+'hoodie.datasource.query.type': 'incremental',
+'hoodie.datasource.read.begin.instanttime': 'beginTime',
 
 Review comment:
   Add two spaces before each line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] vingov commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
vingov commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-616051296
 
 
   Thanks for the contribution @EdwinGuo! This is a good start, I am working on a similar task to add documentation and python package for hudi - [HUDI-783]( https://issues.apache.org/jira/browse/HUDI-783)
   
   I checked the updated quick-start-guide, I feel its a bit confusing when we add the python example just below the scala example, instead if we follow a style guide like the [official spark documentation](https://spark.apache.org/docs/latest/sql-getting-started.html), having tabs for each language code, it will be easy for the users to follow.
   
   Thoughts?
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] EdwinGuo commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
EdwinGuo commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-616121562
 
 
   > Thanks for the contribution @EdwinGuo! This is a good start, I am working on a similar task to add documentation and python package for hudi - [HUDI-783](https://issues.apache.org/jira/browse/HUDI-783)
   > 
   > I checked the updated quick-start-guide, I feel its a bit confusing when we add the python example just below the scala example, instead if we follow a style guide like the [official spark documentation](https://spark.apache.org/docs/latest/sql-getting-started.html), having tabs for each language code, it will be easy for the users to follow.
   > 
   > Thoughts?
   
   Hi @vingov  Thanks for the comments. I agree with you and in fact, I was discussing this point with @lamber-ken and I'm investigating how to enable the tab feature. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] lamber-ken commented on pull request #1526: [HUDI-783] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on pull request #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-622971831


   Thanks @EdwinGuo @vingov, LGTM, left two minor comments. 👍 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] EdwinGuo commented on pull request #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
EdwinGuo commented on pull request #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-619571735


   @lamber-ken @vingov Updated. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vingov commented on a change in pull request #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
vingov commented on a change in pull request #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#discussion_r415994599



##########
File path: docs/_docs/1_1_quick_start_guide.md
##########
@@ -204,6 +213,262 @@ spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The [DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) 
+can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
+
+```python
+# pyspark
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to 
+[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': 'beginTime',
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}
+
+```
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': 'beginTime',
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}

Review comment:
       LGTM, Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vingov commented on pull request #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
vingov commented on pull request #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-620115770


   I've validated the pyspark example code using the hudi demo docker setup, it looks good to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] EdwinGuo commented on pull request #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
EdwinGuo commented on pull request #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-619559348


   > @EdwinGuo - Thanks for trying multiple options, I thought it would be easy to create those language-specific tabs, if it too much work, we can create a separate page for pyspark code.
   > 
   > If we were using docusaurus, it would be super easy to create these tab navigation - https://docusaurus.io/docs/en/doc-markdown#language-specific-code-tabs
   
   Sounds good. Let me switch to tab navigation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] EdwinGuo commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
EdwinGuo commented on issue #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-617160405


   @vingov  I had copied the https://github.com/apache/spark/blob/15462e1a8fa8da54ac51f4d21f567f3c288e6701/docs/js/main.js and reference the js lib like: https://raw.githubusercontent.com/apache/spark/46be1e01e977788f00f1f5aa8d64bc5f191bc578/docs/sql-getting-started.md but I got no luck. Maybe re-orgnize the page with spark-shell code snippet on the top of the quick start guide and pyspark on the bottom of the page?  @lamber-ken  what do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on issue #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-616686836


   @vingov does this supercede your work? Or you could add more on top? Trying to understand how’d these two are related.. 
   
   In any case, do you mind reviewing this since you have this working at uber anyway. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] EdwinGuo commented on a change in pull request #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
EdwinGuo commented on a change in pull request #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#discussion_r415763475



##########
File path: docs/_docs/1_1_quick_start_guide.md
##########
@@ -204,6 +213,262 @@ spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The [DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) 
+can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
+
+```python
+# pyspark
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to 
+[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': 'beginTime',
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}
+
+```
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': 'beginTime',
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}

Review comment:
       Updated. Thanks

##########
File path: docs/_docs/1_1_quick_start_guide.md
##########
@@ -204,6 +213,262 @@ spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The [DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) 
+can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
+
+```python
+# pyspark
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to 
+[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': 'beginTime',

Review comment:
       Updated. Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vingov commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
vingov commented on issue #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-616768714


   @vinothchandar - This is similar to the blog post draft I have prepared, which explains the usage of the hudi reader/writer with pyspark. I will review the example code.
   
   @EdwinGuo - If adding a python code tab is difficult with markdown, let's go with a separate page for explaining the python usage.
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vingov commented on a change in pull request #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
vingov commented on a change in pull request #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#discussion_r415994373



##########
File path: docs/_docs/1_1_quick_start_guide.md
##########
@@ -204,6 +213,262 @@ spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The [DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) 
+can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
+
+```python
+# pyspark
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to 
+[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': 'beginTime',

Review comment:
       LGTM, Thanks @EdwinGuo 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vingov commented on a change in pull request #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
vingov commented on a change in pull request #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#discussion_r415503036



##########
File path: docs/_docs/1_1_quick_start_guide.md
##########
@@ -204,6 +213,262 @@ spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The [DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) 
+can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
+
+```python
+# pyspark
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to 
+[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': 'beginTime',
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}
+
+```
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': 'beginTime',
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}

Review comment:
       @EdwinGuo - The incremental query section is already there between the lines 332-368, please remove this duplicate content.

##########
File path: docs/_docs/1_1_quick_start_guide.md
##########
@@ -204,6 +213,262 @@ spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The [DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) 
+can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
+
+```python
+# pyspark
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to 
+[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': 'beginTime',

Review comment:
       @EdwinGuo - Please remove the quotes around the beginTime, its a variable and not a string literal value. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] lamber-ken commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on issue #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-618086091


   > @vingov I had copied the https://github.com/apache/spark/blob/15462e1a8fa8da54ac51f4d21f567f3c288e6701/docs/js/main.js and reference the js lib like: https://raw.githubusercontent.com/apache/spark/46be1e01e977788f00f1f5aa8d64bc5f191bc578/docs/sql-getting-started.md but I got no luck. Maybe re-orgnize the page with spark-shell code snippet on the top of the quick start guide and pyspark on the bottom of the page? @lamber-ken what do you think?
   
   First of all, this is a good contribution 👍, it's not a good idea to just copy the `main.js`.
   OK, If you have any difficulty doing this, pull this into a separate page is better. :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vingov commented on issue #1526: [HUDI-1526] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
vingov commented on issue #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#issuecomment-618173541


   @EdwinGuo - Thanks for trying multiple options, I thought it would be easy to create those language-specific tabs, if it too much work, we can create a separate page for pyspark code.
   
   If we were using docusaurus, it would be super easy to create these tab navigation - https://docusaurus.io/docs/en/doc-markdown#language-specific-code-tabs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] lamber-ken commented on a change in pull request #1526: [HUDI-783] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#discussion_r418972065



##########
File path: docs/_docs/1_1_quick_start_guide.md
##########
@@ -204,6 +213,224 @@ spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The [DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) 
+can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
+
+```python
+# pyspark
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to 
+[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': beginTime,
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}
+
+## Point in time query
+
+Lets look at how to query data as of a specific time. The specific time can be represented by pointing endTime to a 
+specific commit time and beginTime to "000" (denoting earliest possible commit time). 
+
+```python
+# pyspark
+beginTime = "000" # Represents all commits > this time.
+endTime = commits[len(commits) - 2]
+
+# query point in time data
+point_in_time_read_options = {**incremental_read_options, 
+                              **{"hoodie.datasource.read.end.instanttime": endTime,
+                                "hoodie.datasource.read.begin.instanttime": beginTime}}

Review comment:
       hi @EdwinGuo, syntax error here, please use
   ```
   point_in_time_read_options = {
     'hoodie.datasource.query.type': 'incremental',
     'hoodie.datasource.read.end.instanttime': endTime,
     'hoodie.datasource.read.begin.instanttime': beginTime
   }
   ```

##########
File path: docs/_docs/1_1_quick_start_guide.md
##########
@@ -204,6 +213,224 @@ spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The [DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) 
+can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
+
+```python
+# pyspark
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to 
+[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': beginTime,
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}
+
+## Point in time query
+
+Lets look at how to query data as of a specific time. The specific time can be represented by pointing endTime to a 
+specific commit time and beginTime to "000" (denoting earliest possible commit time). 
+
+```python
+# pyspark
+beginTime = "000" # Represents all commits > this time.
+endTime = commits[len(commits) - 2]
+
+# query point in time data
+point_in_time_read_options = {**incremental_read_options, 
+                              **{"hoodie.datasource.read.end.instanttime": endTime,
+                                "hoodie.datasource.read.begin.instanttime": beginTime}}
+tripsPointInTimeDF = spark.read.format("hudi"). \
+  options(**point_in_time_read_options). \
+  load(basePath)
+
+tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
+```
+
+## Delete data {#deletes}
+Delete records for the HoodieKeys passed in.
+
+Note: Only `Append` mode is supported for delete operation.
+
+```python
+# pyspark
+# fetch total records count
+spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
+# fetch two records to be deleted
+ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
+
+# issue deletes
+hudi_delete_options = {
+'hoodie.table.name': tableName,
+'hoodie.datasource.write.recordkey.field': 'uuid',
+'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+'hoodie.datasource.write.table.name': tableName,
+'hoodie.datasource.write.operation': 'delete',
+'hoodie.datasource.write.precombine.field': 'ts',
+'hoodie.upsert.shuffle.parallelism': 2, 
+'hoodie.insert.shuffle.parallelism': 2

Review comment:
       here, need two blank space.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] EdwinGuo commented on a change in pull request #1526: [HUDI-783] Add pyspark example in quickstart

Posted by GitBox <gi...@apache.org>.
EdwinGuo commented on a change in pull request #1526:
URL: https://github.com/apache/incubator-hudi/pull/1526#discussion_r419103929



##########
File path: docs/_docs/1_1_quick_start_guide.md
##########
@@ -204,6 +213,224 @@ spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The [DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) 
+can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
+
+```python
+# pyspark
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to 
+[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': beginTime,
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}
+
+## Point in time query
+
+Lets look at how to query data as of a specific time. The specific time can be represented by pointing endTime to a 
+specific commit time and beginTime to "000" (denoting earliest possible commit time). 
+
+```python
+# pyspark
+beginTime = "000" # Represents all commits > this time.
+endTime = commits[len(commits) - 2]
+
+# query point in time data
+point_in_time_read_options = {**incremental_read_options, 
+                              **{"hoodie.datasource.read.end.instanttime": endTime,
+                                "hoodie.datasource.read.begin.instanttime": beginTime}}
+tripsPointInTimeDF = spark.read.format("hudi"). \
+  options(**point_in_time_read_options). \
+  load(basePath)
+
+tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
+```
+
+## Delete data {#deletes}
+Delete records for the HoodieKeys passed in.
+
+Note: Only `Append` mode is supported for delete operation.
+
+```python
+# pyspark
+# fetch total records count
+spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
+# fetch two records to be deleted
+ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
+
+# issue deletes
+hudi_delete_options = {
+'hoodie.table.name': tableName,
+'hoodie.datasource.write.recordkey.field': 'uuid',
+'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+'hoodie.datasource.write.table.name': tableName,
+'hoodie.datasource.write.operation': 'delete',
+'hoodie.datasource.write.precombine.field': 'ts',
+'hoodie.upsert.shuffle.parallelism': 2, 
+'hoodie.insert.shuffle.parallelism': 2

Review comment:
       Addressed @lamber-ken  Thanks

##########
File path: docs/_docs/1_1_quick_start_guide.md
##########
@@ -204,6 +213,224 @@ spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The [DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) 
+can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
+
+```python
+# pyspark
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in [schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to 
+[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': beginTime,
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}
+
+## Point in time query
+
+Lets look at how to query data as of a specific time. The specific time can be represented by pointing endTime to a 
+specific commit time and beginTime to "000" (denoting earliest possible commit time). 
+
+```python
+# pyspark
+beginTime = "000" # Represents all commits > this time.
+endTime = commits[len(commits) - 2]
+
+# query point in time data
+point_in_time_read_options = {**incremental_read_options, 
+                              **{"hoodie.datasource.read.end.instanttime": endTime,
+                                "hoodie.datasource.read.begin.instanttime": beginTime}}

Review comment:
       Addressed @lamber-ken  Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org