You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/25 03:13:20 UTC

[GitHub] [hudi] pengzhiwei2018 opened a new pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

pengzhiwei2018 opened a new pull request #2485:
URL: https://github.com/apache/hudi/pull/2485


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   Support spark struct streaming read from hudi COW & MOR table.
   
   ## Brief change log
   - Implement a HoodieSource to read hudi table for spark struct streaming.
   - Implement a HoodieSourceOffset to store the latest commit time has read.
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This change added tests and can be verified as follows:
   
     - Add TestStreamingSource for testing read data from hudi COW & MOR table.
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
vinothchandar edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772797323


   @pengzhiwei2018 I am planning to spend sometime on this as well. 
   
   High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume where we left off. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-768221064


   > Can you check if this change is compatible with Spark 3.0.0?
   Hi @zhedoubushishi , The `Source` and `Offset` implement is still available  in the spark 3.0.0, But it needs to be integrated into the new `SourceProvider` before use it in the 3.0.0.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-775607006


   Hi @vinothchandar , All the comments have processed. Please take a review again when you have time. And I also file a JIRA to support record level streaming consume at [HUDI-1601](https://issues.apache.org/jira/browse/HUDI-1601).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (eedd49b) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `40.49%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2485       +/-   ##
   ============================================
   - Coverage     50.18%   9.68%   -40.50%     
   + Complexity     3050      48     -3002     
   ============================================
     Files           419      53      -366     
     Lines         18931    1930    -17001     
     Branches       1948     230     -1718     
   ============================================
   - Hits           9500     187     -9313     
   + Misses         8656    1730     -6926     
   + Partials        775      13      -762     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [395 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (91cf083) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `40.49%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2485       +/-   ##
   ============================================
   - Coverage     50.18%   9.68%   -40.50%     
   + Complexity     3050      48     -3002     
   ============================================
     Files           419      53      -366     
     Lines         18931    1930    -17001     
     Branches       1948     230     -1718     
   ============================================
   - Hits           9500     187     -9313     
   + Misses         8656    1730     -6926     
   + Partials        775      13      -762     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [395 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (5d3ec8d) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **increase** coverage by `19.29%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #2485       +/-   ##
   =============================================
   + Coverage     50.18%   69.48%   +19.29%     
   + Complexity     3050      358     -2692     
   =============================================
     Files           419       53      -366     
     Lines         18931     1930    -17001     
     Branches       1948      230     -1718     
   =============================================
   - Hits           9500     1341     -8159     
   + Misses         8656      456     -8200     
   + Partials        775      133      -642     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `69.48% <ø> (+0.05%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...he/hudi/common/table/log/block/HoodieLogBlock.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9ibG9jay9Ib29kaWVMb2dCbG9jay5qYXZh) | | | |
   | [...org/apache/hudi/cli/commands/BootstrapCommand.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL0Jvb3RzdHJhcENvbW1hbmQuamF2YQ==) | | | |
   | [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | | | |
   | [...ache/hudi/common/fs/SizeAwareDataOutputStream.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL1NpemVBd2FyZURhdGFPdXRwdXRTdHJlYW0uamF2YQ==) | | | |
   | [...util/jvm/OpenJ9MemoryLayoutSpecification32bit.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvanZtL09wZW5KOU1lbW9yeUxheW91dFNwZWNpZmljYXRpb24zMmJpdC5qYXZh) | | | |
   | [...a/org/apache/hudi/common/util/CompactionUtils.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvQ29tcGFjdGlvblV0aWxzLmphdmE=) | | | |
   | [...n/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0hvb2RpZU1lcmdlT25SZWFkUkRELnNjYWxh) | | | |
   | [...i/hive/SlashEncodedDayPartitionValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2xhc2hFbmNvZGVkRGF5UGFydGl0aW9uVmFsdWVFeHRyYWN0b3IuamF2YQ==) | | | |
   | [...ache/hudi/exception/CorruptedLogFileException.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhjZXB0aW9uL0NvcnJ1cHRlZExvZ0ZpbGVFeGNlcHRpb24uamF2YQ==) | | | |
   | [...org/apache/hudi/common/table/log/AppendResult.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9BcHBlbmRSZXN1bHQuamF2YQ==) | | | |
   | ... and [356 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766593559


   cc @garyli1019 mind taking a first pass at this PR? :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-774528989


   >how can we know the max commit_seq_no in the commit
   
   I think we should do what would be done for Kafka's case. or just use an accumulator to obtain this on each commit? Either way, lets file a follow up JIRA to allow record level streaming? We can do it in a follow up 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] ze-engineering-code-challenge commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
ze-engineering-code-challenge commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781758598


   I have a curiosity, what will happen if I recreate the source table of streaming?
   
   For example, I have a tableA and a streaming using tableA as source and tableB as a sink, for any needs I reprocessed tableA, when Spark streaming try to read tableA, what will happen?
   
   I will try.
   
   @pengzhiwei2018 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r571518809



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+
+case class HoodieSourceOffset(commitTime: String) extends Offset {
+
+  override def json(): String = {
+    HoodieSourceOffset.toJson(this)
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case HoodieSourceOffset(otherCommitTime) =>
+        otherCommitTime == commitTime
+      case _=> false
+    }
+  }
+
+  override def hashCode(): Int = {
+    commitTime.hashCode
+  }
+}
+
+
+object HoodieSourceOffset {
+  val mapper = new ObjectMapper with ScalaObjectMapper
+  mapper.setSerializationInclusion(Include.NON_ABSENT)
+  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+  mapper.registerModule(DefaultScalaModule)
+
+  def toJson(offset: HoodieSourceOffset): String = {
+    mapper.writeValueAsString(offset)
+  }
+
+  def fromJson(json: String): HoodieSourceOffset = {
+    mapper.readValue[HoodieSourceOffset](json)
+  }
+
+  def apply(offset: Offset): HoodieSourceOffset = {
+    offset match {
+      case SerializedOffset(json) => fromJson(json)
+      case o: HoodieSourceOffset => o
+    }
+  }
+
+  val INIT_OFFSET = HoodieSourceOffset("000")

Review comment:
       Agree!

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include

Review comment:
       Thanks for your advice! Yes, currently it is much simple. But in the long term, if we introduce the `commit_seq_no` or other infos to the offset, We may need the json parser. And the jackson dependency is already in the dependency of spark. So I prefer to keep.

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)

Review comment:
       ok!

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils

Review comment:
       Thanks for the advice! 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowEncoder.java
##########
@@ -23,6 +23,8 @@
 
 import java.io.Serializable;
 
-public interface SparkRowDeserializer extends Serializable {
+public interface SparkRowEncoder extends Serializable {

Review comment:
       Agree this!

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+          val firstLineEnd = content.indexOf("\n")
+          if (firstLineEnd > 0) {
+            val version = getVersion(content.substring(0, firstLineEnd))
+            if (version > VERSION) {
+              throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+                s" current version is: $version")
+            }
+            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+          } else {
+            throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+          }
+        }
+      }
+    metadataLog.get(0).getOrElse {
+      metadataLog.add(0, INIT_OFFSET)
+      INIT_OFFSET
+    }
+  }
+
+  private def getVersion(versionLine: String): Int = {
+    if (versionLine.startsWith("v")) {
+      versionLine.substring(1).toInt
+    } else {
+      throw new IllegalStateException(s"Illegal version line: $versionLine " +
+        s"in the streaming metadata path")
+    }
+  }
+
+  override def schema: StructType = {
+    schemaOption.getOrElse {
+      val schemaUtil = new TableSchemaResolver(metaClient)
+      SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+        .dataType.asInstanceOf[StructType]
+    }
+  }
+
+  override def getOffset: Option[Offset] = {

Review comment:
       Yes, getOffset is not a meaningful method name. However it is defined in the spark interface `Source`. We can not rename it but can add some comments for it.

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+          val firstLineEnd = content.indexOf("\n")
+          if (firstLineEnd > 0) {
+            val version = getVersion(content.substring(0, firstLineEnd))
+            if (version > VERSION) {
+              throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+                s" current version is: $version")
+            }
+            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+          } else {
+            throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+          }
+        }
+      }
+    metadataLog.get(0).getOrElse {

Review comment:
       Agree!

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+          val firstLineEnd = content.indexOf("\n")
+          if (firstLineEnd > 0) {
+            val version = getVersion(content.substring(0, firstLineEnd))
+            if (version > VERSION) {
+              throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+                s" current version is: $version")
+            }
+            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+          } else {
+            throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+          }
+        }
+      }
+    metadataLog.get(0).getOrElse {
+      metadataLog.add(0, INIT_OFFSET)
+      INIT_OFFSET
+    }
+  }
+
+  private def getVersion(versionLine: String): Int = {
+    if (versionLine.startsWith("v")) {
+      versionLine.substring(1).toInt
+    } else {
+      throw new IllegalStateException(s"Illegal version line: $versionLine " +
+        s"in the streaming metadata path")
+    }
+  }
+
+  override def schema: StructType = {
+    schemaOption.getOrElse {
+      val schemaUtil = new TableSchemaResolver(metaClient)
+      SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+        .dataType.asInstanceOf[StructType]
+    }
+  }
+
+  override def getOffset: Option[Offset] = {
+    initialPartitionOffsets

Review comment:
       Yes, It can be removed.

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))

Review comment:
       Thank you for advice!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565128254



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * Struct Stream Source for Hudi.

Review comment:
       All right! I will write more for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (56a0ae1) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `1.22%`.
   > The diff coverage is `64.51%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2485      +/-   ##
   ============================================
   + Coverage     49.78%   51.01%   +1.22%     
   - Complexity     3089     3197     +108     
   ============================================
     Files           430      435       +5     
     Lines         19566    19925     +359     
     Branches       2004     2047      +43     
   ============================================
   + Hits           9741    10164     +423     
   + Misses         9033     8925     -108     
   - Partials        792      836      +44     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `36.90% <ø> (-0.31%)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.38% <ø> (-0.12%)` | `0.00 <ø> (ø)` | |
   | hudiflink | `43.21% <ø> (+10.17%)` | `0.00 <ø> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `69.48% <64.51%> (-0.03%)` | `0.00 <27.00> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiutilities | `69.46% <ø> (+7.60%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...rc/main/scala/org/apache/hudi/Spark2RowSerDe.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3BhcmsyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvaHVkaS9TcGFyazJSb3dTZXJEZS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...rc/main/scala/org/apache/hudi/Spark3RowSerDe.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3BhcmszL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvaHVkaS9TcGFyazNSb3dTZXJEZS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
   | [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
   | [.../main/scala/org/apache/hudi/HoodieSparkUtils.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0hvb2RpZVNwYXJrVXRpbHMuc2NhbGE=) | `88.88% <66.66%> (ø)` | `0.00 <0.00> (ø)` | |
   | [.../spark/sql/hudi/streaming/HoodieStreamSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU3RyZWFtU291cmNlLnNjYWxh) | `68.67% <68.67%> (ø)` | `21.00 <21.00> (?)` | |
   | [...ache/hudi/common/fs/inline/InMemoryFileSystem.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9Jbk1lbW9yeUZpbGVTeXN0ZW0uamF2YQ==) | `79.31% <0.00%> (-10.35%)` | `15.00% <0.00%> (-1.00%)` | |
   | [...a/org/apache/hudi/cli/commands/CommitsCommand.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL0NvbW1pdHNDb21tYW5kLmphdmE=) | `53.50% <0.00%> (-5.30%)` | `15.00% <0.00%> (ø%)` | |
   | [...pache/hudi/common/table/HoodieTableMetaClient.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL0hvb2RpZVRhYmxlTWV0YUNsaWVudC5qYXZh) | `67.42% <0.00%> (-3.27%)` | `45.00% <0.00%> (ø%)` | |
   | [.../org/apache/hudi/MergeOnReadSnapshotRelation.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkU25hcHNob3RSZWxhdGlvbi5zY2FsYQ==) | `89.13% <0.00%> (-1.46%)` | `17.00% <0.00%> (+1.00%)` | :arrow_down: |
   | ... and [29 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565127355



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * Struct Stream Source for Hudi.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieSource(

Review comment:
       Thanks for your suggestion. Maybe HoodieStreamSource is suitable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781765517


   > I have a curiosity, what will happen if I recreate the source table of streaming?
   > 
   > For example, I have a tableA and a streaming using tableA as source and tableB as a sink, for any needs I reprocessed tableA, when Spark streaming try to read tableA, what will happen?
   > 
   > I will try.
   > 
   > @pengzhiwei2018
   
   Hi @rubenssoto ,do you mean multiple stream jobs consume the same table simultaneously?Yeah, it does not matter with that. Each stream job keeps its own offset state. So the consumers do not influence each other.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565127355



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * Struct Stream Source for Hudi.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieSource(

Review comment:
       Thanks for your suggestion. Maybe `HoodieStreamSource` is suitable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-767810591


   > @pengzhiwei2018 thanks for your contribution. Left some comments but I am not quite familiar with Structured streaming. @zhedoubushishi mind taking a pass as well?
   
   Sure will take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (8d2ff66) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **increase** coverage by `19.24%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #2485       +/-   ##
   =============================================
   + Coverage     50.18%   69.43%   +19.24%     
   + Complexity     3050      357     -2693     
   =============================================
     Files           419       53      -366     
     Lines         18931     1930    -17001     
     Branches       1948      230     -1718     
   =============================================
   - Hits           9500     1340     -8160     
   + Misses         8656      456     -8200     
   + Partials        775      134      -641     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `69.43% <ø> (ø)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...apache/hudi/common/engine/TaskContextSupplier.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2VuZ2luZS9UYXNrQ29udGV4dFN1cHBsaWVyLmphdmE=) | | | |
   | [...a/org/apache/hudi/cli/commands/CommitsCommand.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL0NvbW1pdHNDb21tYW5kLmphdmE=) | | | |
   | [...pache/hudi/hadoop/HoodieColumnProjectionUtils.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL0hvb2RpZUNvbHVtblByb2plY3Rpb25VdGlscy5qYXZh) | | | |
   | [.../apache/hudi/common/config/SerializableSchema.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2NvbmZpZy9TZXJpYWxpemFibGVTY2hlbWEuamF2YQ==) | | | |
   | [...apache/hudi/common/fs/inline/InLineFileSystem.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9JbkxpbmVGaWxlU3lzdGVtLmphdmE=) | | | |
   | [...apache/hudi/common/util/collection/RocksDBDAO.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvY29sbGVjdGlvbi9Sb2Nrc0RCREFPLmphdmE=) | | | |
   | [...common/table/view/AbstractTableFileSystemView.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3ZpZXcvQWJzdHJhY3RUYWJsZUZpbGVTeXN0ZW1WaWV3LmphdmE=) | | | |
   | [.../apache/hudi/exception/TableNotFoundException.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhjZXB0aW9uL1RhYmxlTm90Rm91bmRFeGNlcHRpb24uamF2YQ==) | | | |
   | [.../apache/hudi/common/fs/ConsistencyGuardConfig.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL0NvbnNpc3RlbmN5R3VhcmRDb25maWcuamF2YQ==) | | | |
   | [...va/org/apache/hudi/common/model/CleanFileInfo.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0NsZWFuRmlsZUluZm8uamF2YQ==) | | | |
   | ... and [355 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] endgab commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
endgab commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-1061545712


   Hello, I haven't found any official documentation about this feature. Is there any reason for that? Can you confirm that it is going to be maintained in the future?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (fa0056e) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `0.15%`.
   > The diff coverage is `67.52%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2485      +/-   ##
   ============================================
   - Coverage     50.18%   50.02%   -0.16%     
   + Complexity     3050     2895     -155     
   ============================================
     Files           419      400      -19     
     Lines         18931    17614    -1317     
     Branches       1948     1829     -119     
   ============================================
   - Hits           9500     8812     -688     
   + Misses         8656     8074     -582     
   + Partials        775      728      -47     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.47% <ø> (-0.03%)` | `0.00 <ø> (ø)` | |
   | hudiflink | `0.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `66.05% <67.52%> (+0.19%)` | `0.00 <27.00> (ø)` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `69.43% <ø> (ø)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
   | [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
   | [...apache/spark/sql/hudi/streaming/HoodieSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlLnNjYWxh) | `69.04% <69.04%> (ø)` | `21.00 <21.00> (?)` | |
   | [...e/hudi/common/table/log/HoodieLogFormatWriter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9Ib29kaWVMb2dGb3JtYXRXcml0ZXIuamF2YQ==) | `78.12% <0.00%> (-1.57%)` | `26.00% <0.00%> (ø%)` | |
   | [.../org/apache/hudi/hive/NonPartitionedExtractor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvTm9uUGFydGl0aW9uZWRFeHRyYWN0b3IuamF2YQ==) | | | |
   | [...in/java/org/apache/hudi/hive/SchemaDifference.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2NoZW1hRGlmZmVyZW5jZS5qYXZh) | | | |
   | [...e/hudi/timeline/service/FileSystemViewHandler.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS10aW1lbGluZS1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL3RpbWVsaW5lL3NlcnZpY2UvRmlsZVN5c3RlbVZpZXdIYW5kbGVyLmphdmE=) | | | |
   | [.../org/apache/hudi/hive/HoodieHiveSyncException.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvSG9vZGllSGl2ZVN5bmNFeGNlcHRpb24uamF2YQ==) | | | |
   | [...i/hive/SlashEncodedDayPartitionValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2xhc2hFbmNvZGVkRGF5UGFydGl0aW9uVmFsdWVFeHRyYWN0b3IuamF2YQ==) | | | |
   | [...c/main/java/org/apache/hudi/dla/DLASyncConfig.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktZGxhLXN5bmMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZGxhL0RMQVN5bmNDb25maWcuamF2YQ==) | | | |
   | ... and [18 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r569077912



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}

Review comment:
       Thanks @zhedoubushishi for your advice. I will do fix this problem later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r568854277



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}

Review comment:
       If it's only compatible with Spark 2, we should leave it in ```hudi-spark2```. If it's compatible with both Spark2 and Spark3, we should leave it in ```hudi-spark```. 
   My understand is at least you need to make this change be able to compile with ```mvn packge -DskipTests -Dspark3```. Otherwise Hudi cannot compile with Spark 3 any more.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
rubenssoto commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781777638


   good to know...
   
   thank you @pengzhiwei2018 !!!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r566190350



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}

Review comment:
       Thanks for your suggestion.I have test with -Dspark3. The `HoodieSourceOffset`  is not work for spark3.
    I have a puzzle that should we leave it in `hudi-spark` or `hudi-spark2`? Currently most of the Spark code stay in the `hudi-spark`. If  moving this to `hudi-spark2` now, many other relate code should also move. So I think we can create a JIRA to track this currently. In the long term, I will provide an implement for the spark3.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r566190350



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}

Review comment:
       Thanks for your suggestion.I have test with -Dspark3. The `HoodieSourceOffset`  is not work for spark3.
    I have a puzzle that should we leave it in `hudi-spark` or `hudi-spark2`? Currently most of the Spark code stay in the `hudi-spark`. If  moving this to `hudi-spark2` now, many other relate code should also move. So I think we can create a JIRA to track this currently. In the long term, I will provide an implement for the spark3.
   Here is the Jira: [https://issues.apache.org/jira/browse/HUDI-1558](url)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772986092


   > @pengzhiwei2018 I am planning to spend sometime on this as well.
   > 
   > High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume where we left off.
   
   Hi @vinothchandar , you are welcome to join this. 
   Currently the `HoodieSourceOffset` just keep the `commitTime` . And every minBatch we consume the incremental data between **(lastCommitTime, currentCommitTime]**.   If it failed during the consuming, It will recovered from the offset state and continue  to consuming the data between  **(lastCommitTime, currentCommitTime]**. It is a commit level recovery.
   Introducing `_hoodie_commit_seq_no` to the `offset` may makes recovery more fine-grained to the record level. But the problem is how can we know the max `commit_seq_no` in the commit.  In the `getOffset` method, we must tell spark which `commit_seq_no` we will read to in the min batch. Currently in the hoodie meta data, we just record the commit time for each commit. So this is problem for slicing the offset.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r564535063



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -181,4 +184,35 @@ class DefaultSource extends RelationProvider
         .resolveRelation()
     }
   }
+
+  override def sourceSchema(sqlContext: SQLContext,
+                            schema: Option[StructType],
+                            providerName: String,
+                            parameters: Map[String, String]): (String, StructType) = {
+    val path = parameters.get("path")
+    if (path.isEmpty || path.get == null) {
+      throw new HoodieException(s"'path'  must be specified.")
+    }
+    val metaClient = new HoodieTableMetaClient(
+      sqlContext.sparkSession.sessionState.newHadoopConf(), path.get)
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val sqlSchema =
+      try {
+        val avroSchema = schemaResolver.getTableAvroSchema
+        SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]

Review comment:
       I think we should use the hudi internal converter from `AvroConversionUtils`, to be consistent with others.

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * Struct Stream Source for Hudi.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieSource(

Review comment:
       More specific name? like `StructuredStreamingHoodieSource`?

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * Struct Stream Source for Hudi.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+          val firstLineEnd = content.indexOf("\n")
+          if (firstLineEnd > 0) {
+            val version = getVersion(content.substring(0, firstLineEnd))
+            if (version > VERSION) {
+              throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+                s" current version is: $version")
+            }
+            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+          } else {
+            throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+          }
+        }
+      }
+    metadataLog.get(0).getOrElse {
+      metadataLog.add(0, INIT_OFFSET)
+      INIT_OFFSET
+    }
+  }
+
+  private def getVersion(versionLine: String): Int = {
+    if (versionLine.startsWith("v")) {
+      versionLine.substring(1).toInt
+    } else {
+      throw new IllegalStateException(s"Illegal version line: $versionLine " +
+        s"in the streaming metadata path")
+    }
+  }
+
+  override def schema: StructType = {
+    schemaOption.getOrElse {
+      val schemaUtil = new TableSchemaResolver(metaClient)
+      SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+        .dataType.asInstanceOf[StructType]
+    }
+  }
+
+  override def getOffset: Option[Offset] = {
+    initialPartitionOffsets
+
+    metaClient.reloadActiveTimeline()
+    val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+    if (!activeInstants.empty()) {
+      val currentLatestCommitTime = activeInstants.lastInstant().get().getTimestamp
+      if (lastOffset == null || currentLatestCommitTime > lastOffset.commitTime) {
+        lastOffset = HoodieSourceOffset(currentLatestCommitTime)
+      }
+    } else { // if there are no active commits, use the init offset
+      lastOffset = initialPartitionOffsets
+    }
+    Some(lastOffset)
+  }
+
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    initialPartitionOffsets
+
+    val startOffset = start.map(HoodieSourceOffset(_))
+      .getOrElse(initialPartitionOffsets)
+    val endOffset = HoodieSourceOffset(end)
+
+    if (startOffset == endOffset) {
+      sqlContext.internalCreateDataFrame(
+        sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), schema, isStreaming = true)
+    } else {
+      // Consume the data between (startCommitTime, endCommitTime]
+      val incParams = parameters ++ Map(
+        DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY -> startCommitTime(startOffset),
+        DataSourceReadOptions.END_INSTANTTIME_OPT_KEY -> endOffset.commitTime
+      )
+
+      val rdd = tableType match {
+        case HoodieTableType.COPY_ON_WRITE =>
+          val encoder = RowEncoder(schema)
+          new IncrementalRelation(sqlContext, incParams, schema, metaClient)
+            .buildScan()
+            .map(encoder.toRow)
+        case HoodieTableType.MERGE_ON_READ =>
+          val requiredColumns = schema.fields.map(_.name)
+          new MergeOnReadIncrementalRelation(sqlContext, incParams, schema, metaClient)
+            .buildScan(requiredColumns, Array.empty[Filter])
+            .asInstanceOf[RDD[InternalRow]]
+        case _ => throw new IllegalArgumentException(s"UnSupport tableType: $tableType")
+      }
+      sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
+    }
+  }
+
+  private def startCommitTime(startOffset: HoodieSourceOffset): String = {
+    startOffset match {
+      case INIT_OFFSET => startOffset.commitTime
+      case HoodieSourceOffset(commitTime) =>
+        val time = HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime
+        // As we consume the data between (start, end], start is not included,
+        // so we +1s to the start commit time here.
+        HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(time + 1000))
+      case _=> throw new IllegalStateException("UnKnow offset type.")
+    }
+  }
+
+  override def stop(): Unit = {
+
+  }
+}
+
+object HoodieSource {
+  val VERSION = 1

Review comment:
       Initial version?

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * Struct Stream Source for Hudi.

Review comment:
       We could add more docs. This sounds like using structured streaming to sink to Hudi. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (f9a4121) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `5.65%`.
   > The diff coverage is `67.52%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2485      +/-   ##
   ============================================
   - Coverage     50.18%   44.52%   -5.66%     
   + Complexity     3050     2838     -212     
   ============================================
     Files           419      432      +13     
     Lines         18931    19682     +751     
     Branches       1948     2019      +71     
   ============================================
   - Hits           9500     8764     -736     
   - Misses         8656    10217    +1561     
   + Partials        775      701      -74     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.51% <ø> (+0.02%)` | `0.00 <ø> (ø)` | |
   | hudiflink | `33.03% <ø> (+33.03%)` | `0.00 <ø> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `66.05% <67.52%> (+0.19%)` | `0.00 <27.00> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
   | [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
   | [.../spark/sql/hudi/streaming/HoodieStreamSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU3RyZWFtU291cmNlLnNjYWxh) | `69.04% <69.04%> (ø)` | `21.00 <21.00> (?)` | |
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [55 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781777083


   > @pengzhiwei2018
   > 
   > no, for example, today with spark structured streaming in a regular parquet if my tableA as a source to my streaming, if I reprocess/recreate tableA spark streaming will process all new files of my reprocessed tableA
   > 
   > if for any reason I need to recreate my tableA, what will happen to my streams?
   > 
   > I dont know if I make myself clear
   
   Hi @rubenssoto.
   If the table has recreated, the offset of the stream source should be reset(e.g. use another checkpoint directory or delete the old checkpoint directory). Otherwise, the old offset may not match the new recreated table and we cannot read data correctly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565128033



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * Struct Stream Source for Hudi.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+          val firstLineEnd = content.indexOf("\n")
+          if (firstLineEnd > 0) {
+            val version = getVersion(content.substring(0, firstLineEnd))
+            if (version > VERSION) {
+              throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+                s" current version is: $version")
+            }
+            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+          } else {
+            throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+          }
+        }
+      }
+    metadataLog.get(0).getOrElse {
+      metadataLog.add(0, INIT_OFFSET)
+      INIT_OFFSET
+    }
+  }
+
+  private def getVersion(versionLine: String): Int = {
+    if (versionLine.startsWith("v")) {
+      versionLine.substring(1).toInt
+    } else {
+      throw new IllegalStateException(s"Illegal version line: $versionLine " +
+        s"in the streaming metadata path")
+    }
+  }
+
+  override def schema: StructType = {
+    schemaOption.getOrElse {
+      val schemaUtil = new TableSchemaResolver(metaClient)
+      SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+        .dataType.asInstanceOf[StructType]
+    }
+  }
+
+  override def getOffset: Option[Offset] = {
+    initialPartitionOffsets
+
+    metaClient.reloadActiveTimeline()
+    val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+    if (!activeInstants.empty()) {
+      val currentLatestCommitTime = activeInstants.lastInstant().get().getTimestamp
+      if (lastOffset == null || currentLatestCommitTime > lastOffset.commitTime) {
+        lastOffset = HoodieSourceOffset(currentLatestCommitTime)
+      }
+    } else { // if there are no active commits, use the init offset
+      lastOffset = initialPartitionOffsets
+    }
+    Some(lastOffset)
+  }
+
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    initialPartitionOffsets
+
+    val startOffset = start.map(HoodieSourceOffset(_))
+      .getOrElse(initialPartitionOffsets)
+    val endOffset = HoodieSourceOffset(end)
+
+    if (startOffset == endOffset) {
+      sqlContext.internalCreateDataFrame(
+        sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), schema, isStreaming = true)
+    } else {
+      // Consume the data between (startCommitTime, endCommitTime]
+      val incParams = parameters ++ Map(
+        DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY -> startCommitTime(startOffset),
+        DataSourceReadOptions.END_INSTANTTIME_OPT_KEY -> endOffset.commitTime
+      )
+
+      val rdd = tableType match {
+        case HoodieTableType.COPY_ON_WRITE =>
+          val encoder = RowEncoder(schema)
+          new IncrementalRelation(sqlContext, incParams, schema, metaClient)
+            .buildScan()
+            .map(encoder.toRow)
+        case HoodieTableType.MERGE_ON_READ =>
+          val requiredColumns = schema.fields.map(_.name)
+          new MergeOnReadIncrementalRelation(sqlContext, incParams, schema, metaClient)
+            .buildScan(requiredColumns, Array.empty[Filter])
+            .asInstanceOf[RDD[InternalRow]]
+        case _ => throw new IllegalArgumentException(s"UnSupport tableType: $tableType")
+      }
+      sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
+    }
+  }
+
+  private def startCommitTime(startOffset: HoodieSourceOffset): String = {
+    startOffset match {
+      case INIT_OFFSET => startOffset.commitTime
+      case HoodieSourceOffset(commitTime) =>
+        val time = HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime
+        // As we consume the data between (start, end], start is not included,
+        // so we +1s to the start commit time here.
+        HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(time + 1000))
+      case _=> throw new IllegalStateException("UnKnow offset type.")
+    }
+  }
+
+  override def stop(): Unit = {
+
+  }
+}
+
+object HoodieSource {
+  val VERSION = 1

Review comment:
       Yes, it is the first version number of this source.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r566168728



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming

Review comment:
       Hi @zhedoubushishi ,there are some method like `sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)` is  private to other package except the `org.apache.spark.sql`. So I move the StreamSource under it. And I have referenced other struct streaming sources like `kafaka`, it also do the same thing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772986092


   > @pengzhiwei2018 I am planning to spend sometime on this as well.
   > 
   > High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume where we left off.
   
   Hi @vinothchandar , welcome to join this. 
   Currently the `HoodieSourceOffset` just keep the `commitTime` . And every minBatch we consume the incremental data between `(lastCommitTime, currentCommitTime]`   If it failed during the consuming, It will recovered from the offset state and continue  to consuming the data between  `(lastCommitTime, currentCommitTime]`. It is a commit level recovery.
   Introducing `_hoodie_commit_seq_no` to the `offset` may makes recovery more fine-grained to the record level. But the problem is how can we know the max `commit_seq_no` in the commit.  In the `getOffset` method, we must tell spark which `commit_seq_no` we will read to in the min batch. Currently in the hoodie meta data, we just record the commit time for each commit. So this is problem for slicing the offset.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (cb80c4f) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `19.64%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #2485       +/-   ##
   =============================================
   + Coverage     49.78%   69.43%   +19.64%     
   + Complexity     3089      357     -2732     
   =============================================
     Files           430       53      -377     
     Lines         19566     1930    -17636     
     Branches       2004      230     -1774     
   =============================================
   - Hits           9741     1340     -8401     
   + Misses         9033      456     -8577     
   + Partials        792      134      -658     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `69.43% <ø> (+7.56%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...g/apache/hudi/cli/utils/SparkTempViewProvider.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL3V0aWxzL1NwYXJrVGVtcFZpZXdQcm92aWRlci5qYXZh) | | | |
   | [...ain/java/org/apache/hudi/cli/utils/CommitUtil.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL3V0aWxzL0NvbW1pdFV0aWwuamF2YQ==) | | | |
   | [...e/hudi/common/engine/HoodieLocalEngineContext.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2VuZ2luZS9Ib29kaWVMb2NhbEVuZ2luZUNvbnRleHQuamF2YQ==) | | | |
   | [...a/org/apache/hudi/streamer/OperationConverter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zdHJlYW1lci9PcGVyYXRpb25Db252ZXJ0ZXIuamF2YQ==) | | | |
   | [...di/common/table/log/block/HoodieAvroDataBlock.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9ibG9jay9Ib29kaWVBdnJvRGF0YUJsb2NrLmphdmE=) | | | |
   | [...ava/org/apache/hudi/common/model/HoodieRecord.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZVJlY29yZC5qYXZh) | | | |
   | [...ache/hudi/common/util/collection/DiskBasedMap.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvY29sbGVjdGlvbi9EaXNrQmFzZWRNYXAuamF2YQ==) | | | |
   | [...src/main/java/org/apache/hudi/cli/TableHeader.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL1RhYmxlSGVhZGVyLmphdmE=) | | | |
   | [...g/apache/hudi/common/table/HoodieTableVersion.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL0hvb2RpZVRhYmxlVmVyc2lvbi5qYXZh) | | | |
   | [.../versioning/clean/CleanPlanV2MigrationHandler.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3RpbWVsaW5lL3ZlcnNpb25pbmcvY2xlYW4vQ2xlYW5QbGFuVjJNaWdyYXRpb25IYW5kbGVyLmphdmE=) | | | |
   | ... and [371 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r571478862



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include

Review comment:
       this is very simple right, can we just hand format the json without the jackson dependency? just a thought. leave it to you

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+
+case class HoodieSourceOffset(commitTime: String) extends Offset {
+
+  override def json(): String = {
+    HoodieSourceOffset.toJson(this)
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case HoodieSourceOffset(otherCommitTime) =>
+        otherCommitTime == commitTime
+      case _=> false
+    }
+  }
+
+  override def hashCode(): Int = {
+    commitTime.hashCode
+  }
+}
+
+
+object HoodieSourceOffset {
+  val mapper = new ObjectMapper with ScalaObjectMapper
+  mapper.setSerializationInclusion(Include.NON_ABSENT)
+  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+  mapper.registerModule(DefaultScalaModule)
+
+  def toJson(offset: HoodieSourceOffset): String = {
+    mapper.writeValueAsString(offset)
+  }
+
+  def fromJson(json: String): HoodieSourceOffset = {
+    mapper.readValue[HoodieSourceOffset](json)
+  }
+
+  def apply(offset: Offset): HoodieSourceOffset = {
+    offset match {
+      case SerializedOffset(json) => fromJson(json)
+      case o: HoodieSourceOffset => o
+    }
+  }
+
+  val INIT_OFFSET = HoodieSourceOffset("000")

Review comment:
       should/can we reuse HoodieTimeline#INIT_INSTANT_TS  ?

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+          val firstLineEnd = content.indexOf("\n")
+          if (firstLineEnd > 0) {
+            val version = getVersion(content.substring(0, firstLineEnd))
+            if (version > VERSION) {
+              throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+                s" current version is: $version")
+            }
+            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+          } else {
+            throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+          }
+        }
+      }
+    metadataLog.get(0).getOrElse {
+      metadataLog.add(0, INIT_OFFSET)
+      INIT_OFFSET
+    }
+  }
+
+  private def getVersion(versionLine: String): Int = {
+    if (versionLine.startsWith("v")) {
+      versionLine.substring(1).toInt
+    } else {
+      throw new IllegalStateException(s"Illegal version line: $versionLine " +
+        s"in the streaming metadata path")
+    }
+  }
+
+  override def schema: StructType = {
+    schemaOption.getOrElse {
+      val schemaUtil = new TableSchemaResolver(metaClient)
+      SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+        .dataType.asInstanceOf[StructType]
+    }
+  }
+
+  override def getOffset: Option[Offset] = {
+    initialPartitionOffsets
+
+    metaClient.reloadActiveTimeline()
+    val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+    if (!activeInstants.empty()) {
+      val currentLatestCommitTime = activeInstants.lastInstant().get().getTimestamp
+      if (lastOffset == null || currentLatestCommitTime > lastOffset.commitTime) {
+        lastOffset = HoodieSourceOffset(currentLatestCommitTime)
+      }
+    } else { // if there are no active commits, use the init offset
+      lastOffset = initialPartitionOffsets
+    }
+    Some(lastOffset)
+  }
+
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    initialPartitionOffsets
+
+    val startOffset = start.map(HoodieSourceOffset(_))
+      .getOrElse(initialPartitionOffsets)
+    val endOffset = HoodieSourceOffset(end)
+
+    if (startOffset == endOffset) {
+      sqlContext.internalCreateDataFrame(

Review comment:
       nice.

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+          val firstLineEnd = content.indexOf("\n")
+          if (firstLineEnd > 0) {
+            val version = getVersion(content.substring(0, firstLineEnd))
+            if (version > VERSION) {
+              throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+                s" current version is: $version")
+            }
+            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+          } else {
+            throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+          }
+        }
+      }
+    metadataLog.get(0).getOrElse {
+      metadataLog.add(0, INIT_OFFSET)
+      INIT_OFFSET
+    }
+  }
+
+  private def getVersion(versionLine: String): Int = {
+    if (versionLine.startsWith("v")) {
+      versionLine.substring(1).toInt
+    } else {
+      throw new IllegalStateException(s"Illegal version line: $versionLine " +
+        s"in the streaming metadata path")
+    }
+  }
+
+  override def schema: StructType = {
+    schemaOption.getOrElse {
+      val schemaUtil = new TableSchemaResolver(metaClient)
+      SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+        .dataType.asInstanceOf[StructType]
+    }
+  }
+
+  override def getOffset: Option[Offset] = {
+    initialPartitionOffsets

Review comment:
       could nt this be done only lazily in the else block? i.e remove this line?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowEncoder.java
##########
@@ -23,6 +23,8 @@
 
 import java.io.Serializable;
 
-public interface SparkRowDeserializer extends Serializable {
+public interface SparkRowEncoder extends Serializable {

Review comment:
       may be `SparkRowSerDe`  is an apt name?

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+          val firstLineEnd = content.indexOf("\n")
+          if (firstLineEnd > 0) {
+            val version = getVersion(content.substring(0, firstLineEnd))
+            if (version > VERSION) {
+              throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+                s" current version is: $version")
+            }
+            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+          } else {
+            throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+          }
+        }
+      }
+    metadataLog.get(0).getOrElse {

Review comment:
       some comments on this code would be helpful

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+          val firstLineEnd = content.indexOf("\n")
+          if (firstLineEnd > 0) {
+            val version = getVersion(content.substring(0, firstLineEnd))
+            if (version > VERSION) {
+              throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+                s" current version is: $version")
+            }
+            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+          } else {
+            throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+          }
+        }
+      }
+    metadataLog.get(0).getOrElse {
+      metadataLog.add(0, INIT_OFFSET)
+      INIT_OFFSET
+    }
+  }
+
+  private def getVersion(versionLine: String): Int = {
+    if (versionLine.startsWith("v")) {
+      versionLine.substring(1).toInt
+    } else {
+      throw new IllegalStateException(s"Illegal version line: $versionLine " +
+        s"in the streaming metadata path")
+    }
+  }
+
+  override def schema: StructType = {
+    schemaOption.getOrElse {
+      val schemaUtil = new TableSchemaResolver(metaClient)
+      SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+        .dataType.asInstanceOf[StructType]
+    }
+  }
+
+  override def getOffset: Option[Offset] = {

Review comment:
       just a rant. `Source#getOffset()` is such a bad name. its actually the latest offset.  :(

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))

Review comment:
       FileIOUtils etc have a similar method

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming

Review comment:
       yeah I had same initial thoughts. but spark does not expose some key things like this. So we have resort to these tricks. Other projects do it too AFAIK

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)

Review comment:
       this does serialize well actually

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils

Review comment:
       Please use the Hudi version of IOUtils. We need the same checkstyle rules applied for scala apache-commons is an illegal import in java code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-775607006


   Hi @vinothchandar , All the comments have been dealt with. Please take a review again when you have time. And I also file a JIRA to support record level streaming consume at [HUDI-1601](https://issues.apache.org/jira/browse/HUDI-1601).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar merged pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
vinothchandar merged pull request #2485:
URL: https://github.com/apache/hudi/pull/2485


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] ze-engineering-code-challenge removed a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
ze-engineering-code-challenge removed a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781758598


   I have a curiosity, what will happen if I recreate the source table of streaming?
   
   For example, I have a tableA and a streaming using tableA as source and tableB as a sink, for any needs I reprocessed tableA, when Spark streaming try to read tableA, what will happen?
   
   I will try.
   
   @pengzhiwei2018 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772986092


   > @pengzhiwei2018 I am planning to spend sometime on this as well.
   > 
   > High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume where we left off.
   
   Hi @vinothchandar , you are welcome to join this. 
   Currently the `HoodieSourceOffset` just keep the `commitTime` . And every minBatch we consume the incremental data between `(lastCommitTime, currentCommitTime]`   If it failed during the consuming, It will recovered from the offset state and continue  to consuming the data between  `(lastCommitTime, currentCommitTime]`. It is a commit level recovery.
   Introducing `_hoodie_commit_seq_no` to the `offset` may makes recovery more fine-grained to the record level. But the problem is how can we know the max `commit_seq_no` in the commit.  In the `getOffset` method, we must tell spark which `commit_seq_no` we will read to in the min batch. Currently in the hoodie meta data, we just record the commit time for each commit. So this is problem for slicing the offset.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
rubenssoto commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781767655


   @pengzhiwei2018 
   
   no, for example, today with spark structured streaming in a regular parquet if my tableA as a source to my streaming, if I reprocess/recreate tableA spark streaming will process all new files of my reprocessed tableA
   
   if for any reason I need to recreate my tableA, what will happen to my streams?
   
   I dont know if I make myself clear


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r566178932



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {

Review comment:
       Yes, it is a spark implement. And it use the FileSystem api call. S3 is compatible with HDFS FileSystem api.[https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/bk_cloud-data-access/content/s3-get-started.html](url)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (64fb20b) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `0.85%`.
   > The diff coverage is `67.52%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2485      +/-   ##
   ============================================
   + Coverage     49.78%   50.63%   +0.85%     
   - Complexity     3089     3147      +58     
   ============================================
     Files           430      432       +2     
     Lines         19566    19683     +117     
     Branches       2004     2019      +15     
   ============================================
   + Hits           9741     9967     +226     
   + Misses         9033     8893     -140     
   - Partials        792      823      +31     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiflink | `33.03% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `69.42% <67.52%> (-0.09%)` | `0.00 <27.00> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiutilities | `69.43% <ø> (+7.56%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
   | [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
   | [.../spark/sql/hudi/streaming/HoodieStreamSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU3RyZWFtU291cmNlLnNjYWxh) | `69.04% <69.04%> (ø)` | `21.00 <21.00> (?)` | |
   | [...g/apache/hudi/MergeOnReadIncrementalRelation.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkSW5jcmVtZW50YWxSZWxhdGlvbi5zY2FsYQ==) | `83.05% <0.00%> (+0.84%)` | `22.00% <0.00%> (+1.00%)` | |
   | [...in/java/org/apache/hudi/utilities/UtilHelpers.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL1V0aWxIZWxwZXJzLmphdmE=) | `64.16% <0.00%> (+1.15%)` | `33.00% <0.00%> (+1.00%)` | |
   | [...i/utilities/deltastreamer/HoodieDeltaStreamer.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllRGVsdGFTdHJlYW1lci5qYXZh) | `68.97% <0.00%> (+1.22%)` | `18.00% <0.00%> (+1.00%)` | |
   | [...ties/deltastreamer/HoodieDeltaStreamerMetrics.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllRGVsdGFTdHJlYW1lck1ldHJpY3MuamF2YQ==) | `36.11% <0.00%> (+2.77%)` | `6.00% <0.00%> (+1.00%)` | |
   | [...utilities/deltastreamer/TableExecutionContext.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvVGFibGVFeGVjdXRpb25Db250ZXh0LmphdmE=) | `65.00% <0.00%> (+65.00%)` | `9.00% <0.00%> (+9.00%)` | |
   | ... and [1 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772797323


   @pengzhiwei2018 I am planning to spend sometime on this as well. 
   
   High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766593559


   cc @garyli1019 mind taking a first pass at this PR? :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (74974ac) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `0.86%`.
   > The diff coverage is `67.52%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2485      +/-   ##
   ============================================
   + Coverage     49.78%   50.65%   +0.86%     
   - Complexity     3089     3148      +59     
   ============================================
     Files           430      432       +2     
     Lines         19566    19683     +117     
     Branches       2004     2019      +15     
   ============================================
   + Hits           9741     9970     +229     
   + Misses         9033     8892     -141     
   - Partials        792      821      +29     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.51% <ø> (+0.02%)` | `0.00 <ø> (ø)` | |
   | hudiflink | `33.03% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `69.42% <67.52%> (-0.09%)` | `0.00 <27.00> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiutilities | `69.48% <ø> (+7.61%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
   | [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
   | [.../spark/sql/hudi/streaming/HoodieStreamSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU3RyZWFtU291cmNlLnNjYWxh) | `69.04% <69.04%> (ø)` | `21.00 <21.00> (?)` | |
   | [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `70.86% <0.00%> (+0.35%)` | `51.00% <0.00%> (+1.00%)` | |
   | [...g/apache/hudi/MergeOnReadIncrementalRelation.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkSW5jcmVtZW50YWxSZWxhdGlvbi5zY2FsYQ==) | `83.05% <0.00%> (+0.84%)` | `22.00% <0.00%> (+1.00%)` | |
   | [...in/java/org/apache/hudi/utilities/UtilHelpers.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL1V0aWxIZWxwZXJzLmphdmE=) | `64.16% <0.00%> (+1.15%)` | `33.00% <0.00%> (+1.00%)` | |
   | [...i/utilities/deltastreamer/HoodieDeltaStreamer.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllRGVsdGFTdHJlYW1lci5qYXZh) | `68.97% <0.00%> (+1.22%)` | `18.00% <0.00%> (+1.00%)` | |
   | [...e/hudi/common/table/log/HoodieLogFormatWriter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9Ib29kaWVMb2dGb3JtYXRXcml0ZXIuamF2YQ==) | `79.68% <0.00%> (+1.56%)` | `26.00% <0.00%> (ø%)` | |
   | ... and [3 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-768221064


   > Can you check if this change is compatible with Spark 3.0.0?
   
   Hi @zhedoubushishi , The `Source` and `Offset` implement is still available  in the spark 3.0.0, But it needs to be integrated into the new `SourceProvider` before use it in the 3.0.0.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772986092


   > @pengzhiwei2018 I am planning to spend sometime on this as well.
   > 
   > High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume where we left off.
   
   Hi @vinothchandar , welcome to join this. 
   Currently the `HoodieSourceOffset` just keep the `commitTime` . In every min batch we consume the incremental data between `(lastCommitTime, currentCommitTime]`   If it failed during the consuming, It will recovered from the offset state and continue  to consuming the data between  `(lastCommitTime, currentCommitTime]`. It is a commit level recovery.
   Introducing `_hoodie_commit_seq_no` to the `offset` may makes recovery more fine-grained to the record level. But the problem is how can we know the max `commit_seq_no` in the commit.  In the `getOffset` method, we must tell spark which `commit_seq_no` we will read to in the min batch. Currently in the hoodie meta data, we just record the commit time for each commit. So this is problem for slicing the offset.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-774584018


   > > how can we know the max commit_seq_no in the commit
   > 
   > I think we should do what would be done for Kafka's case. or just use an accumulator to obtain this on each commit? Either way, lets file a follow up JIRA to allow record level streaming? We can do it in a follow up
   
   Yes, agree to file a JIRA to allow record level streaming!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r568854277



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}

Review comment:
       If it's only compatible with Spark 2, we should leave it in ```hudi-spark2```. If it's compatible with both Spark2 and Spark3, we should leave it in ```hudi-spark```. 
   My understand is at least you need to make this change be able to compile with ```mvn packge -DskipTests -Dspark3```. Otherwise Hudi cannot compile with Spark 3 any more.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r571525627



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}

Review comment:
       done for this!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-768221064


   > Can you check if this change is compatible with Spark 3.0.0?
   Hi @zhedoubushishi , The `Source` and `Offset` is still available  in the spark 3.0.0, But it needs to be integrated into the new `SourceProvider` before use it in the 3.0.0.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (8d2ff66) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `40.49%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2485       +/-   ##
   ============================================
   - Coverage     50.18%   9.68%   -40.50%     
   + Complexity     3050      48     -3002     
   ============================================
     Files           419      53      -366     
     Lines         18931    1930    -17001     
     Branches       1948     230     -1718     
   ============================================
   - Hits           9500     187     -9313     
   + Misses         8656    1730     -6926     
   + Partials        775      13      -762     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [395 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (91cf083) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `40.49%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2485       +/-   ##
   ============================================
   - Coverage     50.18%   9.68%   -40.50%     
   + Complexity     3050      48     -3002     
   ============================================
     Files           419      53      -366     
     Lines         18931    1930    -17001     
     Branches       1948     230     -1718     
   ============================================
   - Hits           9500     187     -9313     
   + Misses         8656    1730     -6926     
   + Partials        775      13      -762     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [395 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-768083458


   Can you check if this change is compatible with Spark 3.0.0?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (fa0056e) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **increase** coverage by `0.10%`.
   > The diff coverage is `67.52%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2485      +/-   ##
   ============================================
   + Coverage     50.18%   50.28%   +0.10%     
   - Complexity     3050     3078      +28     
   ============================================
     Files           419      421       +2     
     Lines         18931    19048     +117     
     Branches       1948     1963      +15     
   ============================================
   + Hits           9500     9578      +78     
   - Misses         8656     8676      +20     
   - Partials        775      794      +19     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.47% <ø> (-0.03%)` | `0.00 <ø> (ø)` | |
   | hudiflink | `0.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `66.05% <67.52%> (+0.19%)` | `0.00 <27.00> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiutilities | `69.43% <ø> (ø)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
   | [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
   | [...apache/spark/sql/hudi/streaming/HoodieSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlLnNjYWxh) | `69.04% <69.04%> (ø)` | `21.00 <21.00> (?)` | |
   | [...e/hudi/common/table/log/HoodieLogFormatWriter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9Ib29kaWVMb2dGb3JtYXRXcml0ZXIuamF2YQ==) | `78.12% <0.00%> (-1.57%)` | `26.00% <0.00%> (ø%)` | |
   | [...g/apache/hudi/MergeOnReadIncrementalRelation.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkSW5jcmVtZW50YWxSZWxhdGlvbi5zY2FsYQ==) | `83.05% <0.00%> (+0.84%)` | `22.00% <0.00%> (+1.00%)` | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (91cf083) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **increase** coverage by `19.24%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #2485       +/-   ##
   =============================================
   + Coverage     50.18%   69.43%   +19.24%     
   + Complexity     3050      357     -2693     
   =============================================
     Files           419       53      -366     
     Lines         18931     1930    -17001     
     Branches       1948      230     -1718     
   =============================================
   - Hits           9500     1340     -8160     
   + Misses         8656      456     -8200     
   + Partials        775      134      -641     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `69.43% <ø> (ø)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...src/main/java/org/apache/hudi/dla/DLASyncTool.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktZGxhLXN5bmMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZGxhL0RMQVN5bmNUb29sLmphdmE=) | | | |
   | [...rg/apache/hudi/metadata/HoodieMetadataPayload.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvbWV0YWRhdGEvSG9vZGllTWV0YWRhdGFQYXlsb2FkLmphdmE=) | | | |
   | [...rg/apache/hudi/common/fs/inline/InLineFSUtils.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9JbkxpbmVGU1V0aWxzLmphdmE=) | | | |
   | [...pache/hudi/metadata/HoodieBackedTableMetadata.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvbWV0YWRhdGEvSG9vZGllQmFja2VkVGFibGVNZXRhZGF0YS5qYXZh) | | | |
   | [...g/apache/hudi/MergeOnReadIncrementalRelation.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkSW5jcmVtZW50YWxSZWxhdGlvbi5zY2FsYQ==) | | | |
   | [...udi/common/util/queue/BoundedInMemoryExecutor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvcXVldWUvQm91bmRlZEluTWVtb3J5RXhlY3V0b3IuamF2YQ==) | | | |
   | [...oop/realtime/HoodieParquetRealtimeInputFormat.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3JlYWx0aW1lL0hvb2RpZVBhcnF1ZXRSZWFsdGltZUlucHV0Rm9ybWF0LmphdmE=) | | | |
   | [.../hive/SlashEncodedHourPartitionValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2xhc2hFbmNvZGVkSG91clBhcnRpdGlvblZhbHVlRXh0cmFjdG9yLmphdmE=) | | | |
   | [...va/org/apache/hudi/common/model/HoodieLogFile.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZUxvZ0ZpbGUuamF2YQ==) | | | |
   | [.../src/main/java/org/apache/hudi/dla/util/Utils.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktZGxhLXN5bmMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZGxhL3V0aWwvVXRpbHMuamF2YQ==) | | | |
   | ... and [355 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565508953



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming

Review comment:
       Do we need to add ```org.apache.spark``` as the prefix? Shall we just use ```org.apache.hudi.streaming```?

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM, INSERT_PARALLELISM, TABLE_NAME, UPSERT_PARALLELISM}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.{Row, SaveMode}
+
+class TestStreamingSource  extends StreamTest {

Review comment:
       [nit] duplicated space

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}

Review comment:
       This class is not compatible with Spark 3. I am thinking if we could move this class to ```hudi-spark2``` module (or any class that is not compatible with Spark 3). In the long term, we should make this change be compatible with both Spark 2 and Spark 3 but I think you can create a JIRA to track this for now. 
   
   You may run ```mvn clean install -Dspark3 -DskipTests``` to see if your change is work with Spark 3.
   
   

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {

Review comment:
       For my understanding, is ```HDFSMetadataLog``` also compatible with other DFS like s3?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
   > Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (55b8bc8) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `40.49%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2485       +/-   ##
   ============================================
   - Coverage     50.18%   9.68%   -40.50%     
   + Complexity     3050      48     -3002     
   ============================================
     Files           419      53      -366     
     Lines         18931    1930    -17001     
     Branches       1948     230     -1718     
   ============================================
   - Hits           9500     187     -9313     
   + Misses         8656    1730     -6926     
   + Partials        775      13      -762     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [395 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r566165153



##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM, INSERT_PARALLELISM, TABLE_NAME, UPSERT_PARALLELISM}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.{Row, SaveMode}
+
+class TestStreamingSource  extends StreamTest {

Review comment:
       Thanks for your correct! 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565127786



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -181,4 +184,35 @@ class DefaultSource extends RelationProvider
         .resolveRelation()
     }
   }
+
+  override def sourceSchema(sqlContext: SQLContext,
+                            schema: Option[StructType],
+                            providerName: String,
+                            parameters: Map[String, String]): (String, StructType) = {
+    val path = parameters.get("path")
+    if (path.isEmpty || path.get == null) {
+      throw new HoodieException(s"'path'  must be specified.")
+    }
+    val metaClient = new HoodieTableMetaClient(
+      sqlContext.sparkSession.sessionState.newHadoopConf(), path.get)
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val sqlSchema =
+      try {
+        val avroSchema = schemaResolver.getTableAvroSchema
+        SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]

Review comment:
       That's all right. I will have a try.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

Posted by GitBox <gi...@apache.org>.
rubenssoto commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781759949


   I have a curiosity, what will happen if I recreate the source table of streaming?
   
   For example, I have a tableA and a streaming using tableA as source and tableB as a sink, for any needs I reprocessed tableA, when Spark streaming try to read tableA, what will happen?
   
   I will try.
   
   @pengzhiwei2018 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org