You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/25 03:13:20 UTC
[GitHub] [hudi] pengzhiwei2018 opened a new pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
pengzhiwei2018 opened a new pull request #2485:
URL: https://github.com/apache/hudi/pull/2485
## *Tips*
- *Thank you very much for contributing to Apache Hudi.*
- *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
## What is the purpose of the pull request
Support spark struct streaming read from hudi COW & MOR table.
## Brief change log
- Implement a HoodieSource to read hudi table for spark struct streaming.
- Implement a HoodieSourceOffset to store the latest commit time has read.
## Verify this pull request
*(Please pick either of the following options)*
This change added tests and can be verified as follows:
- Add TestStreamingSource for testing read data from hudi COW & MOR table.
## Committer checklist
- [ ] Has a corresponding JIRA in PR title & commit
- [ ] Commit message is descriptive of the change
- [ ] CI is green
- [ ] Necessary doc changes done or have another open PR
- [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] vinothchandar edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
vinothchandar edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772797323
@pengzhiwei2018 I am planning to spend sometime on this as well.
High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume where we left off.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-768221064
> Can you check if this change is compatible with Spark 3.0.0?
Hi @zhedoubushishi , The `Source` and `Offset` implement is still available in the spark 3.0.0, But it needs to be integrated into the new `SourceProvider` before use it in the 3.0.0.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-775607006
Hi @vinothchandar , All the comments have processed. Please take a review again when you have time. And I also file a JIRA to support record level streaming consume at [HUDI-1601](https://issues.apache.org/jira/browse/HUDI-1601).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (eedd49b) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `40.49%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
- Coverage 50.18% 9.68% -40.50%
+ Complexity 3050 48 -3002
============================================
Files 419 53 -366
Lines 18931 1930 -17001
Branches 1948 230 -1718
============================================
- Hits 9500 187 -9313
+ Misses 8656 1730 -6926
+ Partials 775 13 -762
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| ... and [395 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (91cf083) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `40.49%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
- Coverage 50.18% 9.68% -40.50%
+ Complexity 3050 48 -3002
============================================
Files 419 53 -366
Lines 18931 1930 -17001
Branches 1948 230 -1718
============================================
- Hits 9500 187 -9313
+ Misses 8656 1730 -6926
+ Partials 775 13 -762
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| ... and [395 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (5d3ec8d) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **increase** coverage by `19.29%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
=============================================
+ Coverage 50.18% 69.48% +19.29%
+ Complexity 3050 358 -2692
=============================================
Files 419 53 -366
Lines 18931 1930 -17001
Branches 1948 230 -1718
=============================================
- Hits 9500 1341 -8159
+ Misses 8656 456 -8200
+ Partials 775 133 -642
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `69.48% <ø> (+0.05%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...he/hudi/common/table/log/block/HoodieLogBlock.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9ibG9jay9Ib29kaWVMb2dCbG9jay5qYXZh) | | | |
| [...org/apache/hudi/cli/commands/BootstrapCommand.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL0Jvb3RzdHJhcENvbW1hbmQuamF2YQ==) | | | |
| [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | | | |
| [...ache/hudi/common/fs/SizeAwareDataOutputStream.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL1NpemVBd2FyZURhdGFPdXRwdXRTdHJlYW0uamF2YQ==) | | | |
| [...util/jvm/OpenJ9MemoryLayoutSpecification32bit.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvanZtL09wZW5KOU1lbW9yeUxheW91dFNwZWNpZmljYXRpb24zMmJpdC5qYXZh) | | | |
| [...a/org/apache/hudi/common/util/CompactionUtils.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvQ29tcGFjdGlvblV0aWxzLmphdmE=) | | | |
| [...n/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0hvb2RpZU1lcmdlT25SZWFkUkRELnNjYWxh) | | | |
| [...i/hive/SlashEncodedDayPartitionValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2xhc2hFbmNvZGVkRGF5UGFydGl0aW9uVmFsdWVFeHRyYWN0b3IuamF2YQ==) | | | |
| [...ache/hudi/exception/CorruptedLogFileException.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhjZXB0aW9uL0NvcnJ1cHRlZExvZ0ZpbGVFeGNlcHRpb24uamF2YQ==) | | | |
| [...org/apache/hudi/common/table/log/AppendResult.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9BcHBlbmRSZXN1bHQuamF2YQ==) | | | |
| ... and [356 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] vinothchandar commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766593559
cc @garyli1019 mind taking a first pass at this PR? :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] vinothchandar commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-774528989
>how can we know the max commit_seq_no in the commit
I think we should do what would be done for Kafka's case. or just use an accumulator to obtain this on each commit? Either way, lets file a follow up JIRA to allow record level streaming? We can do it in a follow up
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] ze-engineering-code-challenge commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
ze-engineering-code-challenge commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781758598
I have a curiosity, what will happen if I recreate the source table of streaming?
For example, I have a tableA and a streaming using tableA as source and tableB as a sink, for any needs I reprocessed tableA, when Spark streaming try to read tableA, what will happen?
I will try.
@pengzhiwei2018
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r571518809
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+
+case class HoodieSourceOffset(commitTime: String) extends Offset {
+
+ override def json(): String = {
+ HoodieSourceOffset.toJson(this)
+ }
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case HoodieSourceOffset(otherCommitTime) =>
+ otherCommitTime == commitTime
+ case _=> false
+ }
+ }
+
+ override def hashCode(): Int = {
+ commitTime.hashCode
+ }
+}
+
+
+object HoodieSourceOffset {
+ val mapper = new ObjectMapper with ScalaObjectMapper
+ mapper.setSerializationInclusion(Include.NON_ABSENT)
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ mapper.registerModule(DefaultScalaModule)
+
+ def toJson(offset: HoodieSourceOffset): String = {
+ mapper.writeValueAsString(offset)
+ }
+
+ def fromJson(json: String): HoodieSourceOffset = {
+ mapper.readValue[HoodieSourceOffset](json)
+ }
+
+ def apply(offset: Offset): HoodieSourceOffset = {
+ offset match {
+ case SerializedOffset(json) => fromJson(json)
+ case o: HoodieSourceOffset => o
+ }
+ }
+
+ val INIT_OFFSET = HoodieSourceOffset("000")
Review comment:
Agree!
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
Review comment:
Thanks for your advice! Yes, currently it is much simple. But in the long term, if we introduce the `commit_seq_no` or other infos to the offset, We may need the json parser. And the jackson dependency is already in the dependency of spark. So I prefer to keep.
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
Review comment:
ok!
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
Review comment:
Thanks for the advice!
##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowEncoder.java
##########
@@ -23,6 +23,8 @@
import java.io.Serializable;
-public interface SparkRowDeserializer extends Serializable {
+public interface SparkRowEncoder extends Serializable {
Review comment:
Agree this!
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+ val firstLineEnd = content.indexOf("\n")
+ if (firstLineEnd > 0) {
+ val version = getVersion(content.substring(0, firstLineEnd))
+ if (version > VERSION) {
+ throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+ s" current version is: $version")
+ }
+ HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+ } else {
+ throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+ }
+ }
+ }
+ metadataLog.get(0).getOrElse {
+ metadataLog.add(0, INIT_OFFSET)
+ INIT_OFFSET
+ }
+ }
+
+ private def getVersion(versionLine: String): Int = {
+ if (versionLine.startsWith("v")) {
+ versionLine.substring(1).toInt
+ } else {
+ throw new IllegalStateException(s"Illegal version line: $versionLine " +
+ s"in the streaming metadata path")
+ }
+ }
+
+ override def schema: StructType = {
+ schemaOption.getOrElse {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+ .dataType.asInstanceOf[StructType]
+ }
+ }
+
+ override def getOffset: Option[Offset] = {
Review comment:
Yes, getOffset is not a meaningful method name. However it is defined in the spark interface `Source`. We can not rename it but can add some comments for it.
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+ val firstLineEnd = content.indexOf("\n")
+ if (firstLineEnd > 0) {
+ val version = getVersion(content.substring(0, firstLineEnd))
+ if (version > VERSION) {
+ throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+ s" current version is: $version")
+ }
+ HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+ } else {
+ throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+ }
+ }
+ }
+ metadataLog.get(0).getOrElse {
Review comment:
Agree!
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+ val firstLineEnd = content.indexOf("\n")
+ if (firstLineEnd > 0) {
+ val version = getVersion(content.substring(0, firstLineEnd))
+ if (version > VERSION) {
+ throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+ s" current version is: $version")
+ }
+ HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+ } else {
+ throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+ }
+ }
+ }
+ metadataLog.get(0).getOrElse {
+ metadataLog.add(0, INIT_OFFSET)
+ INIT_OFFSET
+ }
+ }
+
+ private def getVersion(versionLine: String): Int = {
+ if (versionLine.startsWith("v")) {
+ versionLine.substring(1).toInt
+ } else {
+ throw new IllegalStateException(s"Illegal version line: $versionLine " +
+ s"in the streaming metadata path")
+ }
+ }
+
+ override def schema: StructType = {
+ schemaOption.getOrElse {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+ .dataType.asInstanceOf[StructType]
+ }
+ }
+
+ override def getOffset: Option[Offset] = {
+ initialPartitionOffsets
Review comment:
Yes, It can be removed.
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
Review comment:
Thank you for advice!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565128254
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Struct Stream Source for Hudi.
Review comment:
All right! I will write more for this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (56a0ae1) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `1.22%`.
> The diff coverage is `64.51%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
+ Coverage 49.78% 51.01% +1.22%
- Complexity 3089 3197 +108
============================================
Files 430 435 +5
Lines 19566 19925 +359
Branches 2004 2047 +43
============================================
+ Hits 9741 10164 +423
+ Misses 9033 8925 -108
- Partials 792 836 +44
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `36.90% <ø> (-0.31%)` | `0.00 <ø> (ø)` | |
| hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudicommon | `51.38% <ø> (-0.12%)` | `0.00 <ø> (ø)` | |
| hudiflink | `43.21% <ø> (+10.17%)` | `0.00 <ø> (ø)` | |
| hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisparkdatasource | `69.48% <64.51%> (-0.03%)` | `0.00 <27.00> (ø)` | |
| hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
| huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiutilities | `69.46% <ø> (+7.60%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...rc/main/scala/org/apache/hudi/Spark2RowSerDe.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3BhcmsyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvaHVkaS9TcGFyazJSb3dTZXJEZS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
| [...rc/main/scala/org/apache/hudi/Spark3RowSerDe.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3BhcmszL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvaHVkaS9TcGFyazNSb3dTZXJEZS5zY2FsYQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
| [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
| [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
| [.../main/scala/org/apache/hudi/HoodieSparkUtils.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0hvb2RpZVNwYXJrVXRpbHMuc2NhbGE=) | `88.88% <66.66%> (ø)` | `0.00 <0.00> (ø)` | |
| [.../spark/sql/hudi/streaming/HoodieStreamSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU3RyZWFtU291cmNlLnNjYWxh) | `68.67% <68.67%> (ø)` | `21.00 <21.00> (?)` | |
| [...ache/hudi/common/fs/inline/InMemoryFileSystem.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9Jbk1lbW9yeUZpbGVTeXN0ZW0uamF2YQ==) | `79.31% <0.00%> (-10.35%)` | `15.00% <0.00%> (-1.00%)` | |
| [...a/org/apache/hudi/cli/commands/CommitsCommand.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL0NvbW1pdHNDb21tYW5kLmphdmE=) | `53.50% <0.00%> (-5.30%)` | `15.00% <0.00%> (ø%)` | |
| [...pache/hudi/common/table/HoodieTableMetaClient.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL0hvb2RpZVRhYmxlTWV0YUNsaWVudC5qYXZh) | `67.42% <0.00%> (-3.27%)` | `45.00% <0.00%> (ø%)` | |
| [.../org/apache/hudi/MergeOnReadSnapshotRelation.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkU25hcHNob3RSZWxhdGlvbi5zY2FsYQ==) | `89.13% <0.00%> (-1.46%)` | `17.00% <0.00%> (+1.00%)` | :arrow_down: |
| ... and [29 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565127355
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Struct Stream Source for Hudi.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieSource(
Review comment:
Thanks for your suggestion. Maybe HoodieStreamSource is suitable.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781765517
> I have a curiosity, what will happen if I recreate the source table of streaming?
>
> For example, I have a tableA and a streaming using tableA as source and tableB as a sink, for any needs I reprocessed tableA, when Spark streaming try to read tableA, what will happen?
>
> I will try.
>
> @pengzhiwei2018
Hi @rubenssoto ,do you mean multiple stream jobs consume the same table simultaneously?Yeah, it does not matter with that. Each stream job keeps its own offset state. So the consumers do not influence each other.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565127355
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Struct Stream Source for Hudi.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieSource(
Review comment:
Thanks for your suggestion. Maybe `HoodieStreamSource` is suitable.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] zhedoubushishi commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-767810591
> @pengzhiwei2018 thanks for your contribution. Left some comments but I am not quite familiar with Structured streaming. @zhedoubushishi mind taking a pass as well?
Sure will take a look.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (8d2ff66) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **increase** coverage by `19.24%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
=============================================
+ Coverage 50.18% 69.43% +19.24%
+ Complexity 3050 357 -2693
=============================================
Files 419 53 -366
Lines 18931 1930 -17001
Branches 1948 230 -1718
=============================================
- Hits 9500 1340 -8160
+ Misses 8656 456 -8200
+ Partials 775 134 -641
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `69.43% <ø> (ø)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...apache/hudi/common/engine/TaskContextSupplier.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2VuZ2luZS9UYXNrQ29udGV4dFN1cHBsaWVyLmphdmE=) | | | |
| [...a/org/apache/hudi/cli/commands/CommitsCommand.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL0NvbW1pdHNDb21tYW5kLmphdmE=) | | | |
| [...pache/hudi/hadoop/HoodieColumnProjectionUtils.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL0hvb2RpZUNvbHVtblByb2plY3Rpb25VdGlscy5qYXZh) | | | |
| [.../apache/hudi/common/config/SerializableSchema.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2NvbmZpZy9TZXJpYWxpemFibGVTY2hlbWEuamF2YQ==) | | | |
| [...apache/hudi/common/fs/inline/InLineFileSystem.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9JbkxpbmVGaWxlU3lzdGVtLmphdmE=) | | | |
| [...apache/hudi/common/util/collection/RocksDBDAO.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvY29sbGVjdGlvbi9Sb2Nrc0RCREFPLmphdmE=) | | | |
| [...common/table/view/AbstractTableFileSystemView.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3ZpZXcvQWJzdHJhY3RUYWJsZUZpbGVTeXN0ZW1WaWV3LmphdmE=) | | | |
| [.../apache/hudi/exception/TableNotFoundException.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhjZXB0aW9uL1RhYmxlTm90Rm91bmRFeGNlcHRpb24uamF2YQ==) | | | |
| [.../apache/hudi/common/fs/ConsistencyGuardConfig.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL0NvbnNpc3RlbmN5R3VhcmRDb25maWcuamF2YQ==) | | | |
| [...va/org/apache/hudi/common/model/CleanFileInfo.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0NsZWFuRmlsZUluZm8uamF2YQ==) | | | |
| ... and [355 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] endgab commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
endgab commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-1061545712
Hello, I haven't found any official documentation about this feature. Is there any reason for that? Can you confirm that it is going to be maintained in the future?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (fa0056e) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `0.15%`.
> The diff coverage is `67.52%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
- Coverage 50.18% 50.02% -0.16%
+ Complexity 3050 2895 -155
============================================
Files 419 400 -19
Lines 18931 17614 -1317
Branches 1948 1829 -119
============================================
- Hits 9500 8812 -688
+ Misses 8656 8074 -582
+ Partials 775 728 -47
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudicommon | `51.47% <ø> (-0.03%)` | `0.00 <ø> (ø)` | |
| hudiflink | `0.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisparkdatasource | `66.05% <67.52%> (+0.19%)` | `0.00 <27.00> (ø)` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `69.43% <ø> (ø)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
| [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
| [...apache/spark/sql/hudi/streaming/HoodieSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlLnNjYWxh) | `69.04% <69.04%> (ø)` | `21.00 <21.00> (?)` | |
| [...e/hudi/common/table/log/HoodieLogFormatWriter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9Ib29kaWVMb2dGb3JtYXRXcml0ZXIuamF2YQ==) | `78.12% <0.00%> (-1.57%)` | `26.00% <0.00%> (ø%)` | |
| [.../org/apache/hudi/hive/NonPartitionedExtractor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvTm9uUGFydGl0aW9uZWRFeHRyYWN0b3IuamF2YQ==) | | | |
| [...in/java/org/apache/hudi/hive/SchemaDifference.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2NoZW1hRGlmZmVyZW5jZS5qYXZh) | | | |
| [...e/hudi/timeline/service/FileSystemViewHandler.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS10aW1lbGluZS1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL3RpbWVsaW5lL3NlcnZpY2UvRmlsZVN5c3RlbVZpZXdIYW5kbGVyLmphdmE=) | | | |
| [.../org/apache/hudi/hive/HoodieHiveSyncException.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvSG9vZGllSGl2ZVN5bmNFeGNlcHRpb24uamF2YQ==) | | | |
| [...i/hive/SlashEncodedDayPartitionValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2xhc2hFbmNvZGVkRGF5UGFydGl0aW9uVmFsdWVFeHRyYWN0b3IuamF2YQ==) | | | |
| [...c/main/java/org/apache/hudi/dla/DLASyncConfig.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktZGxhLXN5bmMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZGxhL0RMQVN5bmNDb25maWcuamF2YQ==) | | | |
| ... and [18 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r569077912
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
Review comment:
Thanks @zhedoubushishi for your advice. I will do fix this problem later.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] zhedoubushishi commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r568854277
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
Review comment:
If it's only compatible with Spark 2, we should leave it in ```hudi-spark2```. If it's compatible with both Spark2 and Spark3, we should leave it in ```hudi-spark```.
My understand is at least you need to make this change be able to compile with ```mvn packge -DskipTests -Dspark3```. Otherwise Hudi cannot compile with Spark 3 any more.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] rubenssoto commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
rubenssoto commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781777638
good to know...
thank you @pengzhiwei2018 !!!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r566190350
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
Review comment:
Thanks for your suggestion.I have test with -Dspark3. The `HoodieSourceOffset` is not work for spark3.
I have a puzzle that should we leave it in `hudi-spark` or `hudi-spark2`? Currently most of the Spark code stay in the `hudi-spark`. If moving this to `hudi-spark2` now, many other relate code should also move. So I think we can create a JIRA to track this currently. In the long term, I will provide an implement for the spark3.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r566190350
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
Review comment:
Thanks for your suggestion.I have test with -Dspark3. The `HoodieSourceOffset` is not work for spark3.
I have a puzzle that should we leave it in `hudi-spark` or `hudi-spark2`? Currently most of the Spark code stay in the `hudi-spark`. If moving this to `hudi-spark2` now, many other relate code should also move. So I think we can create a JIRA to track this currently. In the long term, I will provide an implement for the spark3.
Here is the Jira: [https://issues.apache.org/jira/browse/HUDI-1558](url)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772986092
> @pengzhiwei2018 I am planning to spend sometime on this as well.
>
> High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume where we left off.
Hi @vinothchandar , you are welcome to join this.
Currently the `HoodieSourceOffset` just keep the `commitTime` . And every minBatch we consume the incremental data between **(lastCommitTime, currentCommitTime]**. If it failed during the consuming, It will recovered from the offset state and continue to consuming the data between **(lastCommitTime, currentCommitTime]**. It is a commit level recovery.
Introducing `_hoodie_commit_seq_no` to the `offset` may makes recovery more fine-grained to the record level. But the problem is how can we know the max `commit_seq_no` in the commit. In the `getOffset` method, we must tell spark which `commit_seq_no` we will read to in the min batch. Currently in the hoodie meta data, we just record the commit time for each commit. So this is problem for slicing the offset.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r564535063
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -181,4 +184,35 @@ class DefaultSource extends RelationProvider
.resolveRelation()
}
}
+
+ override def sourceSchema(sqlContext: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType) = {
+ val path = parameters.get("path")
+ if (path.isEmpty || path.get == null) {
+ throw new HoodieException(s"'path' must be specified.")
+ }
+ val metaClient = new HoodieTableMetaClient(
+ sqlContext.sparkSession.sessionState.newHadoopConf(), path.get)
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val sqlSchema =
+ try {
+ val avroSchema = schemaResolver.getTableAvroSchema
+ SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
Review comment:
I think we should use the hudi internal converter from `AvroConversionUtils`, to be consistent with others.
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Struct Stream Source for Hudi.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieSource(
Review comment:
More specific name? like `StructuredStreamingHoodieSource`?
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Struct Stream Source for Hudi.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+ val firstLineEnd = content.indexOf("\n")
+ if (firstLineEnd > 0) {
+ val version = getVersion(content.substring(0, firstLineEnd))
+ if (version > VERSION) {
+ throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+ s" current version is: $version")
+ }
+ HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+ } else {
+ throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+ }
+ }
+ }
+ metadataLog.get(0).getOrElse {
+ metadataLog.add(0, INIT_OFFSET)
+ INIT_OFFSET
+ }
+ }
+
+ private def getVersion(versionLine: String): Int = {
+ if (versionLine.startsWith("v")) {
+ versionLine.substring(1).toInt
+ } else {
+ throw new IllegalStateException(s"Illegal version line: $versionLine " +
+ s"in the streaming metadata path")
+ }
+ }
+
+ override def schema: StructType = {
+ schemaOption.getOrElse {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+ .dataType.asInstanceOf[StructType]
+ }
+ }
+
+ override def getOffset: Option[Offset] = {
+ initialPartitionOffsets
+
+ metaClient.reloadActiveTimeline()
+ val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+ if (!activeInstants.empty()) {
+ val currentLatestCommitTime = activeInstants.lastInstant().get().getTimestamp
+ if (lastOffset == null || currentLatestCommitTime > lastOffset.commitTime) {
+ lastOffset = HoodieSourceOffset(currentLatestCommitTime)
+ }
+ } else { // if there are no active commits, use the init offset
+ lastOffset = initialPartitionOffsets
+ }
+ Some(lastOffset)
+ }
+
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ initialPartitionOffsets
+
+ val startOffset = start.map(HoodieSourceOffset(_))
+ .getOrElse(initialPartitionOffsets)
+ val endOffset = HoodieSourceOffset(end)
+
+ if (startOffset == endOffset) {
+ sqlContext.internalCreateDataFrame(
+ sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), schema, isStreaming = true)
+ } else {
+ // Consume the data between (startCommitTime, endCommitTime]
+ val incParams = parameters ++ Map(
+ DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY -> startCommitTime(startOffset),
+ DataSourceReadOptions.END_INSTANTTIME_OPT_KEY -> endOffset.commitTime
+ )
+
+ val rdd = tableType match {
+ case HoodieTableType.COPY_ON_WRITE =>
+ val encoder = RowEncoder(schema)
+ new IncrementalRelation(sqlContext, incParams, schema, metaClient)
+ .buildScan()
+ .map(encoder.toRow)
+ case HoodieTableType.MERGE_ON_READ =>
+ val requiredColumns = schema.fields.map(_.name)
+ new MergeOnReadIncrementalRelation(sqlContext, incParams, schema, metaClient)
+ .buildScan(requiredColumns, Array.empty[Filter])
+ .asInstanceOf[RDD[InternalRow]]
+ case _ => throw new IllegalArgumentException(s"UnSupport tableType: $tableType")
+ }
+ sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
+ }
+ }
+
+ private def startCommitTime(startOffset: HoodieSourceOffset): String = {
+ startOffset match {
+ case INIT_OFFSET => startOffset.commitTime
+ case HoodieSourceOffset(commitTime) =>
+ val time = HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime
+ // As we consume the data between (start, end], start is not included,
+ // so we +1s to the start commit time here.
+ HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(time + 1000))
+ case _=> throw new IllegalStateException("UnKnow offset type.")
+ }
+ }
+
+ override def stop(): Unit = {
+
+ }
+}
+
+object HoodieSource {
+ val VERSION = 1
Review comment:
Initial version?
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Struct Stream Source for Hudi.
Review comment:
We could add more docs. This sounds like using structured streaming to sink to Hudi.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (f9a4121) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `5.65%`.
> The diff coverage is `67.52%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
- Coverage 50.18% 44.52% -5.66%
+ Complexity 3050 2838 -212
============================================
Files 419 432 +13
Lines 18931 19682 +751
Branches 1948 2019 +71
============================================
- Hits 9500 8764 -736
- Misses 8656 10217 +1561
+ Partials 775 701 -74
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudicommon | `51.51% <ø> (+0.02%)` | `0.00 <ø> (ø)` | |
| hudiflink | `33.03% <ø> (+33.03%)` | `0.00 <ø> (ø)` | |
| hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisparkdatasource | `66.05% <67.52%> (+0.19%)` | `0.00 <27.00> (ø)` | |
| hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
| huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
| [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
| [.../spark/sql/hudi/streaming/HoodieStreamSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU3RyZWFtU291cmNlLnNjYWxh) | `69.04% <69.04%> (ø)` | `21.00 <21.00> (?)` | |
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| ... and [55 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781777083
> @pengzhiwei2018
>
> no, for example, today with spark structured streaming in a regular parquet if my tableA as a source to my streaming, if I reprocess/recreate tableA spark streaming will process all new files of my reprocessed tableA
>
> if for any reason I need to recreate my tableA, what will happen to my streams?
>
> I dont know if I make myself clear
Hi @rubenssoto.
If the table has recreated, the offset of the stream source should be reset(e.g. use another checkpoint directory or delete the old checkpoint directory). Otherwise, the old offset may not match the new recreated table and we cannot read data correctly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565128033
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Struct Stream Source for Hudi.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+ val firstLineEnd = content.indexOf("\n")
+ if (firstLineEnd > 0) {
+ val version = getVersion(content.substring(0, firstLineEnd))
+ if (version > VERSION) {
+ throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+ s" current version is: $version")
+ }
+ HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+ } else {
+ throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+ }
+ }
+ }
+ metadataLog.get(0).getOrElse {
+ metadataLog.add(0, INIT_OFFSET)
+ INIT_OFFSET
+ }
+ }
+
+ private def getVersion(versionLine: String): Int = {
+ if (versionLine.startsWith("v")) {
+ versionLine.substring(1).toInt
+ } else {
+ throw new IllegalStateException(s"Illegal version line: $versionLine " +
+ s"in the streaming metadata path")
+ }
+ }
+
+ override def schema: StructType = {
+ schemaOption.getOrElse {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+ .dataType.asInstanceOf[StructType]
+ }
+ }
+
+ override def getOffset: Option[Offset] = {
+ initialPartitionOffsets
+
+ metaClient.reloadActiveTimeline()
+ val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+ if (!activeInstants.empty()) {
+ val currentLatestCommitTime = activeInstants.lastInstant().get().getTimestamp
+ if (lastOffset == null || currentLatestCommitTime > lastOffset.commitTime) {
+ lastOffset = HoodieSourceOffset(currentLatestCommitTime)
+ }
+ } else { // if there are no active commits, use the init offset
+ lastOffset = initialPartitionOffsets
+ }
+ Some(lastOffset)
+ }
+
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ initialPartitionOffsets
+
+ val startOffset = start.map(HoodieSourceOffset(_))
+ .getOrElse(initialPartitionOffsets)
+ val endOffset = HoodieSourceOffset(end)
+
+ if (startOffset == endOffset) {
+ sqlContext.internalCreateDataFrame(
+ sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), schema, isStreaming = true)
+ } else {
+ // Consume the data between (startCommitTime, endCommitTime]
+ val incParams = parameters ++ Map(
+ DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY -> startCommitTime(startOffset),
+ DataSourceReadOptions.END_INSTANTTIME_OPT_KEY -> endOffset.commitTime
+ )
+
+ val rdd = tableType match {
+ case HoodieTableType.COPY_ON_WRITE =>
+ val encoder = RowEncoder(schema)
+ new IncrementalRelation(sqlContext, incParams, schema, metaClient)
+ .buildScan()
+ .map(encoder.toRow)
+ case HoodieTableType.MERGE_ON_READ =>
+ val requiredColumns = schema.fields.map(_.name)
+ new MergeOnReadIncrementalRelation(sqlContext, incParams, schema, metaClient)
+ .buildScan(requiredColumns, Array.empty[Filter])
+ .asInstanceOf[RDD[InternalRow]]
+ case _ => throw new IllegalArgumentException(s"UnSupport tableType: $tableType")
+ }
+ sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
+ }
+ }
+
+ private def startCommitTime(startOffset: HoodieSourceOffset): String = {
+ startOffset match {
+ case INIT_OFFSET => startOffset.commitTime
+ case HoodieSourceOffset(commitTime) =>
+ val time = HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime
+ // As we consume the data between (start, end], start is not included,
+ // so we +1s to the start commit time here.
+ HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(time + 1000))
+ case _=> throw new IllegalStateException("UnKnow offset type.")
+ }
+ }
+
+ override def stop(): Unit = {
+
+ }
+}
+
+object HoodieSource {
+ val VERSION = 1
Review comment:
Yes, it is the first version number of this source.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r566168728
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
Review comment:
Hi @zhedoubushishi ,there are some method like `sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)` is private to other package except the `org.apache.spark.sql`. So I move the StreamSource under it. And I have referenced other struct streaming sources like `kafaka`, it also do the same thing.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772986092
> @pengzhiwei2018 I am planning to spend sometime on this as well.
>
> High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume where we left off.
Hi @vinothchandar , welcome to join this.
Currently the `HoodieSourceOffset` just keep the `commitTime` . And every minBatch we consume the incremental data between `(lastCommitTime, currentCommitTime]` If it failed during the consuming, It will recovered from the offset state and continue to consuming the data between `(lastCommitTime, currentCommitTime]`. It is a commit level recovery.
Introducing `_hoodie_commit_seq_no` to the `offset` may makes recovery more fine-grained to the record level. But the problem is how can we know the max `commit_seq_no` in the commit. In the `getOffset` method, we must tell spark which `commit_seq_no` we will read to in the min batch. Currently in the hoodie meta data, we just record the commit time for each commit. So this is problem for slicing the offset.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (cb80c4f) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `19.64%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
=============================================
+ Coverage 49.78% 69.43% +19.64%
+ Complexity 3089 357 -2732
=============================================
Files 430 53 -377
Lines 19566 1930 -17636
Branches 2004 230 -1774
=============================================
- Hits 9741 1340 -8401
+ Misses 9033 456 -8577
+ Partials 792 134 -658
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `69.43% <ø> (+7.56%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...g/apache/hudi/cli/utils/SparkTempViewProvider.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL3V0aWxzL1NwYXJrVGVtcFZpZXdQcm92aWRlci5qYXZh) | | | |
| [...ain/java/org/apache/hudi/cli/utils/CommitUtil.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL3V0aWxzL0NvbW1pdFV0aWwuamF2YQ==) | | | |
| [...e/hudi/common/engine/HoodieLocalEngineContext.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2VuZ2luZS9Ib29kaWVMb2NhbEVuZ2luZUNvbnRleHQuamF2YQ==) | | | |
| [...a/org/apache/hudi/streamer/OperationConverter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zdHJlYW1lci9PcGVyYXRpb25Db252ZXJ0ZXIuamF2YQ==) | | | |
| [...di/common/table/log/block/HoodieAvroDataBlock.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9ibG9jay9Ib29kaWVBdnJvRGF0YUJsb2NrLmphdmE=) | | | |
| [...ava/org/apache/hudi/common/model/HoodieRecord.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZVJlY29yZC5qYXZh) | | | |
| [...ache/hudi/common/util/collection/DiskBasedMap.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvY29sbGVjdGlvbi9EaXNrQmFzZWRNYXAuamF2YQ==) | | | |
| [...src/main/java/org/apache/hudi/cli/TableHeader.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL1RhYmxlSGVhZGVyLmphdmE=) | | | |
| [...g/apache/hudi/common/table/HoodieTableVersion.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL0hvb2RpZVRhYmxlVmVyc2lvbi5qYXZh) | | | |
| [.../versioning/clean/CleanPlanV2MigrationHandler.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3RpbWVsaW5lL3ZlcnNpb25pbmcvY2xlYW4vQ2xlYW5QbGFuVjJNaWdyYXRpb25IYW5kbGVyLmphdmE=) | | | |
| ... and [371 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] vinothchandar commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r571478862
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
Review comment:
this is very simple right, can we just hand format the json without the jackson dependency? just a thought. leave it to you
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+
+case class HoodieSourceOffset(commitTime: String) extends Offset {
+
+ override def json(): String = {
+ HoodieSourceOffset.toJson(this)
+ }
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case HoodieSourceOffset(otherCommitTime) =>
+ otherCommitTime == commitTime
+ case _=> false
+ }
+ }
+
+ override def hashCode(): Int = {
+ commitTime.hashCode
+ }
+}
+
+
+object HoodieSourceOffset {
+ val mapper = new ObjectMapper with ScalaObjectMapper
+ mapper.setSerializationInclusion(Include.NON_ABSENT)
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ mapper.registerModule(DefaultScalaModule)
+
+ def toJson(offset: HoodieSourceOffset): String = {
+ mapper.writeValueAsString(offset)
+ }
+
+ def fromJson(json: String): HoodieSourceOffset = {
+ mapper.readValue[HoodieSourceOffset](json)
+ }
+
+ def apply(offset: Offset): HoodieSourceOffset = {
+ offset match {
+ case SerializedOffset(json) => fromJson(json)
+ case o: HoodieSourceOffset => o
+ }
+ }
+
+ val INIT_OFFSET = HoodieSourceOffset("000")
Review comment:
should/can we reuse HoodieTimeline#INIT_INSTANT_TS ?
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+ val firstLineEnd = content.indexOf("\n")
+ if (firstLineEnd > 0) {
+ val version = getVersion(content.substring(0, firstLineEnd))
+ if (version > VERSION) {
+ throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+ s" current version is: $version")
+ }
+ HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+ } else {
+ throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+ }
+ }
+ }
+ metadataLog.get(0).getOrElse {
+ metadataLog.add(0, INIT_OFFSET)
+ INIT_OFFSET
+ }
+ }
+
+ private def getVersion(versionLine: String): Int = {
+ if (versionLine.startsWith("v")) {
+ versionLine.substring(1).toInt
+ } else {
+ throw new IllegalStateException(s"Illegal version line: $versionLine " +
+ s"in the streaming metadata path")
+ }
+ }
+
+ override def schema: StructType = {
+ schemaOption.getOrElse {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+ .dataType.asInstanceOf[StructType]
+ }
+ }
+
+ override def getOffset: Option[Offset] = {
+ initialPartitionOffsets
+
+ metaClient.reloadActiveTimeline()
+ val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+ if (!activeInstants.empty()) {
+ val currentLatestCommitTime = activeInstants.lastInstant().get().getTimestamp
+ if (lastOffset == null || currentLatestCommitTime > lastOffset.commitTime) {
+ lastOffset = HoodieSourceOffset(currentLatestCommitTime)
+ }
+ } else { // if there are no active commits, use the init offset
+ lastOffset = initialPartitionOffsets
+ }
+ Some(lastOffset)
+ }
+
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ initialPartitionOffsets
+
+ val startOffset = start.map(HoodieSourceOffset(_))
+ .getOrElse(initialPartitionOffsets)
+ val endOffset = HoodieSourceOffset(end)
+
+ if (startOffset == endOffset) {
+ sqlContext.internalCreateDataFrame(
Review comment:
nice.
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+ val firstLineEnd = content.indexOf("\n")
+ if (firstLineEnd > 0) {
+ val version = getVersion(content.substring(0, firstLineEnd))
+ if (version > VERSION) {
+ throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+ s" current version is: $version")
+ }
+ HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+ } else {
+ throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+ }
+ }
+ }
+ metadataLog.get(0).getOrElse {
+ metadataLog.add(0, INIT_OFFSET)
+ INIT_OFFSET
+ }
+ }
+
+ private def getVersion(versionLine: String): Int = {
+ if (versionLine.startsWith("v")) {
+ versionLine.substring(1).toInt
+ } else {
+ throw new IllegalStateException(s"Illegal version line: $versionLine " +
+ s"in the streaming metadata path")
+ }
+ }
+
+ override def schema: StructType = {
+ schemaOption.getOrElse {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+ .dataType.asInstanceOf[StructType]
+ }
+ }
+
+ override def getOffset: Option[Offset] = {
+ initialPartitionOffsets
Review comment:
could nt this be done only lazily in the else block? i.e remove this line?
##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowEncoder.java
##########
@@ -23,6 +23,8 @@
import java.io.Serializable;
-public interface SparkRowDeserializer extends Serializable {
+public interface SparkRowEncoder extends Serializable {
Review comment:
may be `SparkRowSerDe` is an apt name?
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+ val firstLineEnd = content.indexOf("\n")
+ if (firstLineEnd > 0) {
+ val version = getVersion(content.substring(0, firstLineEnd))
+ if (version > VERSION) {
+ throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+ s" current version is: $version")
+ }
+ HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+ } else {
+ throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+ }
+ }
+ }
+ metadataLog.get(0).getOrElse {
Review comment:
some comments on this code would be helpful
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+
+ val firstLineEnd = content.indexOf("\n")
+ if (firstLineEnd > 0) {
+ val version = getVersion(content.substring(0, firstLineEnd))
+ if (version > VERSION) {
+ throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+ s" current version is: $version")
+ }
+ HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+ } else {
+ throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+ }
+ }
+ }
+ metadataLog.get(0).getOrElse {
+ metadataLog.add(0, INIT_OFFSET)
+ INIT_OFFSET
+ }
+ }
+
+ private def getVersion(versionLine: String): Int = {
+ if (versionLine.startsWith("v")) {
+ versionLine.substring(1).toInt
+ } else {
+ throw new IllegalStateException(s"Illegal version line: $versionLine " +
+ s"in the streaming metadata path")
+ }
+ }
+
+ override def schema: StructType = {
+ schemaOption.getOrElse {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+ .dataType.asInstanceOf[StructType]
+ }
+ }
+
+ override def getOffset: Option[Offset] = {
Review comment:
just a rant. `Source#getOffset()` is such a bad name. its actually the latest offset. :(
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
Review comment:
FileIOUtils etc have a similar method
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
Review comment:
yeah I had same initial thoughts. but spark does not expose some key things like this. So we have resort to these tricks. Other projects do it too AFAIK
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
Review comment:
this does serialize well actually
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
Review comment:
Please use the Hudi version of IOUtils. We need the same checkstyle rules applied for scala apache-commons is an illegal import in java code
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-775607006
Hi @vinothchandar , All the comments have been dealt with. Please take a review again when you have time. And I also file a JIRA to support record level streaming consume at [HUDI-1601](https://issues.apache.org/jira/browse/HUDI-1601).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] vinothchandar merged pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
vinothchandar merged pull request #2485:
URL: https://github.com/apache/hudi/pull/2485
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] ze-engineering-code-challenge removed a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
ze-engineering-code-challenge removed a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781758598
I have a curiosity, what will happen if I recreate the source table of streaming?
For example, I have a tableA and a streaming using tableA as source and tableB as a sink, for any needs I reprocessed tableA, when Spark streaming try to read tableA, what will happen?
I will try.
@pengzhiwei2018
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772986092
> @pengzhiwei2018 I am planning to spend sometime on this as well.
>
> High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume where we left off.
Hi @vinothchandar , you are welcome to join this.
Currently the `HoodieSourceOffset` just keep the `commitTime` . And every minBatch we consume the incremental data between `(lastCommitTime, currentCommitTime]` If it failed during the consuming, It will recovered from the offset state and continue to consuming the data between `(lastCommitTime, currentCommitTime]`. It is a commit level recovery.
Introducing `_hoodie_commit_seq_no` to the `offset` may makes recovery more fine-grained to the record level. But the problem is how can we know the max `commit_seq_no` in the commit. In the `getOffset` method, we must tell spark which `commit_seq_no` we will read to in the min batch. Currently in the hoodie meta data, we just record the commit time for each commit. So this is problem for slicing the offset.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] rubenssoto commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
rubenssoto commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781767655
@pengzhiwei2018
no, for example, today with spark structured streaming in a regular parquet if my tableA as a source to my streaming, if I reprocess/recreate tableA spark streaming will process all new files of my reprocessed tableA
if for any reason I need to recreate my tableA, what will happen to my streams?
I dont know if I make myself clear
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r566178932
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
Review comment:
Yes, it is a spark implement. And it use the FileSystem api call. S3 is compatible with HDFS FileSystem api.[https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/bk_cloud-data-access/content/s3-get-started.html](url)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (64fb20b) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `0.85%`.
> The diff coverage is `67.52%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
+ Coverage 49.78% 50.63% +0.85%
- Complexity 3089 3147 +58
============================================
Files 430 432 +2
Lines 19566 19683 +117
Branches 2004 2019 +15
============================================
+ Hits 9741 9967 +226
+ Misses 9033 8893 -140
- Partials 792 823 +31
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudicommon | `51.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiflink | `33.03% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisparkdatasource | `69.42% <67.52%> (-0.09%)` | `0.00 <27.00> (ø)` | |
| hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
| huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiutilities | `69.43% <ø> (+7.56%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
| [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
| [.../spark/sql/hudi/streaming/HoodieStreamSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU3RyZWFtU291cmNlLnNjYWxh) | `69.04% <69.04%> (ø)` | `21.00 <21.00> (?)` | |
| [...g/apache/hudi/MergeOnReadIncrementalRelation.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkSW5jcmVtZW50YWxSZWxhdGlvbi5zY2FsYQ==) | `83.05% <0.00%> (+0.84%)` | `22.00% <0.00%> (+1.00%)` | |
| [...in/java/org/apache/hudi/utilities/UtilHelpers.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL1V0aWxIZWxwZXJzLmphdmE=) | `64.16% <0.00%> (+1.15%)` | `33.00% <0.00%> (+1.00%)` | |
| [...i/utilities/deltastreamer/HoodieDeltaStreamer.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllRGVsdGFTdHJlYW1lci5qYXZh) | `68.97% <0.00%> (+1.22%)` | `18.00% <0.00%> (+1.00%)` | |
| [...ties/deltastreamer/HoodieDeltaStreamerMetrics.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllRGVsdGFTdHJlYW1lck1ldHJpY3MuamF2YQ==) | `36.11% <0.00%> (+2.77%)` | `6.00% <0.00%> (+1.00%)` | |
| [...utilities/deltastreamer/TableExecutionContext.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvVGFibGVFeGVjdXRpb25Db250ZXh0LmphdmE=) | `65.00% <0.00%> (+65.00%)` | `9.00% <0.00%> (+9.00%)` | |
| ... and [1 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] vinothchandar commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772797323
@pengzhiwei2018 I am planning to spend sometime on this as well.
High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] vinothchandar commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766593559
cc @garyli1019 mind taking a first pass at this PR? :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (74974ac) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `0.86%`.
> The diff coverage is `67.52%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
+ Coverage 49.78% 50.65% +0.86%
- Complexity 3089 3148 +59
============================================
Files 430 432 +2
Lines 19566 19683 +117
Branches 2004 2019 +15
============================================
+ Hits 9741 9970 +229
+ Misses 9033 8892 -141
- Partials 792 821 +29
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudicommon | `51.51% <ø> (+0.02%)` | `0.00 <ø> (ø)` | |
| hudiflink | `33.03% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisparkdatasource | `69.42% <67.52%> (-0.09%)` | `0.00 <27.00> (ø)` | |
| hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
| huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiutilities | `69.48% <ø> (+7.61%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
| [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
| [.../spark/sql/hudi/streaming/HoodieStreamSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU3RyZWFtU291cmNlLnNjYWxh) | `69.04% <69.04%> (ø)` | `21.00 <21.00> (?)` | |
| [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `70.86% <0.00%> (+0.35%)` | `51.00% <0.00%> (+1.00%)` | |
| [...g/apache/hudi/MergeOnReadIncrementalRelation.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkSW5jcmVtZW50YWxSZWxhdGlvbi5zY2FsYQ==) | `83.05% <0.00%> (+0.84%)` | `22.00% <0.00%> (+1.00%)` | |
| [...in/java/org/apache/hudi/utilities/UtilHelpers.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL1V0aWxIZWxwZXJzLmphdmE=) | `64.16% <0.00%> (+1.15%)` | `33.00% <0.00%> (+1.00%)` | |
| [...i/utilities/deltastreamer/HoodieDeltaStreamer.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllRGVsdGFTdHJlYW1lci5qYXZh) | `68.97% <0.00%> (+1.22%)` | `18.00% <0.00%> (+1.00%)` | |
| [...e/hudi/common/table/log/HoodieLogFormatWriter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9Ib29kaWVMb2dGb3JtYXRXcml0ZXIuamF2YQ==) | `79.68% <0.00%> (+1.56%)` | `26.00% <0.00%> (ø%)` | |
| ... and [3 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-768221064
> Can you check if this change is compatible with Spark 3.0.0?
Hi @zhedoubushishi , The `Source` and `Offset` implement is still available in the spark 3.0.0, But it needs to be integrated into the new `SourceProvider` before use it in the 3.0.0.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-772986092
> @pengzhiwei2018 I am planning to spend sometime on this as well.
>
> High level question. does the `offset` for the streaming read map to `_hoodie_commit_seq_no` in this implementation. This way we can actually do record level streams and even resume where we left off.
Hi @vinothchandar , welcome to join this.
Currently the `HoodieSourceOffset` just keep the `commitTime` . In every min batch we consume the incremental data between `(lastCommitTime, currentCommitTime]` If it failed during the consuming, It will recovered from the offset state and continue to consuming the data between `(lastCommitTime, currentCommitTime]`. It is a commit level recovery.
Introducing `_hoodie_commit_seq_no` to the `offset` may makes recovery more fine-grained to the record level. But the problem is how can we know the max `commit_seq_no` in the commit. In the `getOffset` method, we must tell spark which `commit_seq_no` we will read to in the min batch. Currently in the hoodie meta data, we just record the commit time for each commit. So this is problem for slicing the offset.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-774584018
> > how can we know the max commit_seq_no in the commit
>
> I think we should do what would be done for Kafka's case. or just use an accumulator to obtain this on each commit? Either way, lets file a follow up JIRA to allow record level streaming? We can do it in a follow up
Yes, agree to file a JIRA to allow record level streaming!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] zhedoubushishi commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r568854277
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
Review comment:
If it's only compatible with Spark 2, we should leave it in ```hudi-spark2```. If it's compatible with both Spark2 and Spark3, we should leave it in ```hudi-spark```.
My understand is at least you need to make this change be able to compile with ```mvn packge -DskipTests -Dspark3```. Otherwise Hudi cannot compile with Spark 3 any more.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r571525627
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
Review comment:
done for this!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-768221064
> Can you check if this change is compatible with Spark 3.0.0?
Hi @zhedoubushishi , The `Source` and `Offset` is still available in the spark 3.0.0, But it needs to be integrated into the new `SourceProvider` before use it in the 3.0.0.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (8d2ff66) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `40.49%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
- Coverage 50.18% 9.68% -40.50%
+ Complexity 3050 48 -3002
============================================
Files 419 53 -366
Lines 18931 1930 -17001
Branches 1948 230 -1718
============================================
- Hits 9500 187 -9313
+ Misses 8656 1730 -6926
+ Partials 775 13 -762
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| ... and [395 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (91cf083) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `40.49%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
- Coverage 50.18% 9.68% -40.50%
+ Complexity 3050 48 -3002
============================================
Files 419 53 -366
Lines 18931 1930 -17001
Branches 1948 230 -1718
============================================
- Hits 9500 187 -9313
+ Misses 8656 1730 -6926
+ Partials 775 13 -762
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| ... and [395 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] zhedoubushishi commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-768083458
Can you check if this change is compatible with Spark 3.0.0?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (fa0056e) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **increase** coverage by `0.10%`.
> The diff coverage is `67.52%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
+ Coverage 50.18% 50.28% +0.10%
- Complexity 3050 3078 +28
============================================
Files 419 421 +2
Lines 18931 19048 +117
Branches 1948 1963 +15
============================================
+ Hits 9500 9578 +78
- Misses 8656 8676 +20
- Partials 775 794 +19
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudicommon | `51.47% <ø> (-0.03%)` | `0.00 <ø> (ø)` | |
| hudiflink | `0.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisparkdatasource | `66.05% <67.52%> (+0.19%)` | `0.00 <27.00> (ø)` | |
| hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
| huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiutilities | `69.43% <ø> (ø)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [.../spark/sql/hudi/streaming/HoodieSourceOffset.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlT2Zmc2V0LnNjYWxh) | `63.15% <63.15%> (ø)` | `4.00 <4.00> (?)` | |
| [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `84.21% <64.28%> (-4.50%)` | `17.00 <2.00> (+2.00)` | :arrow_down: |
| [...apache/spark/sql/hudi/streaming/HoodieSource.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvaHVkaS9zdHJlYW1pbmcvSG9vZGllU291cmNlLnNjYWxh) | `69.04% <69.04%> (ø)` | `21.00 <21.00> (?)` | |
| [...e/hudi/common/table/log/HoodieLogFormatWriter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9Ib29kaWVMb2dGb3JtYXRXcml0ZXIuamF2YQ==) | `78.12% <0.00%> (-1.57%)` | `26.00% <0.00%> (ø%)` | |
| [...g/apache/hudi/MergeOnReadIncrementalRelation.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkSW5jcmVtZW50YWxSZWxhdGlvbi5zY2FsYQ==) | `83.05% <0.00%> (+0.84%)` | `22.00% <0.00%> (+1.00%)` | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (91cf083) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **increase** coverage by `19.24%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
=============================================
+ Coverage 50.18% 69.43% +19.24%
+ Complexity 3050 357 -2693
=============================================
Files 419 53 -366
Lines 18931 1930 -17001
Branches 1948 230 -1718
=============================================
- Hits 9500 1340 -8160
+ Misses 8656 456 -8200
+ Partials 775 134 -641
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `69.43% <ø> (ø)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...src/main/java/org/apache/hudi/dla/DLASyncTool.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktZGxhLXN5bmMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZGxhL0RMQVN5bmNUb29sLmphdmE=) | | | |
| [...rg/apache/hudi/metadata/HoodieMetadataPayload.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvbWV0YWRhdGEvSG9vZGllTWV0YWRhdGFQYXlsb2FkLmphdmE=) | | | |
| [...rg/apache/hudi/common/fs/inline/InLineFSUtils.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9JbkxpbmVGU1V0aWxzLmphdmE=) | | | |
| [...pache/hudi/metadata/HoodieBackedTableMetadata.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvbWV0YWRhdGEvSG9vZGllQmFja2VkVGFibGVNZXRhZGF0YS5qYXZh) | | | |
| [...g/apache/hudi/MergeOnReadIncrementalRelation.scala](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkSW5jcmVtZW50YWxSZWxhdGlvbi5zY2FsYQ==) | | | |
| [...udi/common/util/queue/BoundedInMemoryExecutor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvcXVldWUvQm91bmRlZEluTWVtb3J5RXhlY3V0b3IuamF2YQ==) | | | |
| [...oop/realtime/HoodieParquetRealtimeInputFormat.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3JlYWx0aW1lL0hvb2RpZVBhcnF1ZXRSZWFsdGltZUlucHV0Rm9ybWF0LmphdmE=) | | | |
| [.../hive/SlashEncodedHourPartitionValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2xhc2hFbmNvZGVkSG91clBhcnRpdGlvblZhbHVlRXh0cmFjdG9yLmphdmE=) | | | |
| [...va/org/apache/hudi/common/model/HoodieLogFile.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZUxvZ0ZpbGUuamF2YQ==) | | | |
| [.../src/main/java/org/apache/hudi/dla/util/Utils.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktZGxhLXN5bmMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZGxhL3V0aWwvVXRpbHMuamF2YQ==) | | | |
| ... and [355 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] zhedoubushishi commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565508953
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
Review comment:
Do we need to add ```org.apache.spark``` as the prefix? Shall we just use ```org.apache.hudi.streaming```?
##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM, INSERT_PARALLELISM, TABLE_NAME, UPSERT_PARALLELISM}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.{Row, SaveMode}
+
+class TestStreamingSource extends StreamTest {
Review comment:
[nit] duplicated space
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
Review comment:
This class is not compatible with Spark 3. I am thinking if we could move this class to ```hudi-spark2``` module (or any class that is not compatible with Spark 3). In the long term, we should make this change be compatible with both Spark 2 and Spark 3 but I think you can create a JIRA to track this for now.
You may run ```mvn clean install -Dspark3 -DskipTests``` to see if your change is work with Spark 3.
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String])
+ extends Source with Logging with Serializable {
+
+ @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ private lazy val tablePath: Path = {
+ val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+ val fs = path.getFileSystem(hadoopConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+ private lazy val tableType = metaClient.getTableType
+
+ @transient private var lastOffset: HoodieSourceOffset = _
+ @transient private lazy val initialPartitionOffsets = {
+ val metadataLog =
+ new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
Review comment:
For my understanding, is ```HDFSMetadataLog``` also compatible with other DFS like s3?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-766519181
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=h1) Report
> Merging [#2485](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=desc) (55b8bc8) into [master](https://codecov.io/gh/apache/hudi/commit/e302c6bc12c7eb764781898fdee8ee302ef4ec10?el=desc) (e302c6b) will **decrease** coverage by `40.49%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2485/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2485 +/- ##
============================================
- Coverage 50.18% 9.68% -40.50%
+ Complexity 3050 48 -3002
============================================
Files 419 53 -366
Lines 18931 1930 -17001
Branches 1948 230 -1718
============================================
- Hits 9500 187 -9313
+ Misses 8656 1730 -6926
+ Partials 775 13 -762
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `9.68% <ø> (-59.75%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2485?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| ... and [395 more](https://codecov.io/gh/apache/hudi/pull/2485/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r566165153
##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM, INSERT_PARALLELISM, TABLE_NAME, UPSERT_PARALLELISM}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.{Row, SaveMode}
+
+class TestStreamingSource extends StreamTest {
Review comment:
Thanks for your correct!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
pengzhiwei2018 commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565127786
##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -181,4 +184,35 @@ class DefaultSource extends RelationProvider
.resolveRelation()
}
}
+
+ override def sourceSchema(sqlContext: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType) = {
+ val path = parameters.get("path")
+ if (path.isEmpty || path.get == null) {
+ throw new HoodieException(s"'path' must be specified.")
+ }
+ val metaClient = new HoodieTableMetaClient(
+ sqlContext.sparkSession.sessionState.newHadoopConf(), path.get)
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val sqlSchema =
+ try {
+ val avroSchema = schemaResolver.getTableAvroSchema
+ SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
Review comment:
That's all right. I will have a try.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] rubenssoto commented on pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table
Posted by GitBox <gi...@apache.org>.
rubenssoto commented on pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#issuecomment-781759949
I have a curiosity, what will happen if I recreate the source table of streaming?
For example, I have a tableA and a streaming using tableA as source and tableB as a sink, for any needs I reprocessed tableA, when Spark streaming try to read tableA, what will happen?
I will try.
@pengzhiwei2018
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org