You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/18 06:13:58 UTC
[GitHub] [hudi] danny0405 opened a new pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
danny0405 opened a new pull request #2581:
URL: https://github.com/apache/hudi/pull/2581
… files
## What is the purpose of the pull request
The index should bootstrap from existing base files if there are, in the
design, we load all the keys for one partition if we found that the key
does not exist in the index for `processElement`, if there are many
records for this partition, the processing may block and trigger back
pressure. When all the records are loaded, we only need to check the
state each time a record is tagged.
## Brief change log
- Modify BucketAssignFunction to load existing records for indexing
## Verify this pull request
Added UTs.
## Committer checklist
- [ ] Has a corresponding JIRA in PR title & commit
- [ ] Commit message is descriptive of the change
- [ ] CI is green
- [ ] Necessary doc changes done or have another open PR
- [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579947920
##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
*/
public class HoodieIndexUtils {
+ /**
+ * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+ *
+ * @param partition Partition of interest
+ * @param context Instance of {@link HoodieEngineContext} to use
+ * @param hoodieTable Instance of {@link HoodieTable} of interest
+ * @return the list of {@link HoodieBaseFile}
+ */
+ public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
Review comment:
I think we could.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783053827
hi @danny0405 is it necessary to revert `this.bucketAssigner.reset();` to `BucketAssiginFunction#snapshotState` in this patch? : )
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579964461
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +130,14 @@ public BucketAssignFunction(Configuration conf) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
- HoodieFlinkEngineContext context =
- new HoodieFlinkEngineContext(
- new SerializableConfiguration(StreamerUtil.getHadoopConf()),
- new FlinkTaskContextSupplier(getRuntimeContext()));
- this.bucketAssigner = new BucketAssigner(
- context,
- writeConfig);
+ this.hadoopConf = StreamerUtil.getHadoopConf();
+ this.context = new HoodieFlinkEngineContext(
+ new SerializableConfiguration(this.hadoopConf),
+ new FlinkTaskContextSupplier(getRuntimeContext()));
+ this.bucketAssigner = new BucketAssigner(context, writeConfig);
+ final FileSystem fs = FSUtils.getFs(this.conf.getString(FlinkOptions.PATH), this.hadoopConf);
Review comment:
`fs` never used, we can remove it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579779589
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reset();
this.bucketAssigner.refreshTable();
+ checkPartitionsLoaded();
+ }
+
+ /**
+ * Load all the indices of give partition path into the backup state.
+ *
+ * @param partitionPath The partition path
+ * @throws Exception when error occurs for state update
+ */
+ private void loadRecords(String partitionPath) throws Exception {
+ HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+ List<HoodieBaseFile> latestBaseFiles =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+ for (HoodieBaseFile baseFile : latestBaseFiles) {
+ List<HoodieKey> hoodieKeys =
+ ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+ hoodieKeys.forEach(hoodieKey -> {
+ try {
+ this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+ } catch (Exception e) {
+ throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+ }
+ });
+ }
+ // Mark the partition path as loaded.
+ partitionLoadState.put(partitionPath, 0);
Review comment:
maybe put a boolean?
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reset();
this.bucketAssigner.refreshTable();
+ checkPartitionsLoaded();
+ }
+
+ /**
+ * Load all the indices of give partition path into the backup state.
+ *
+ * @param partitionPath The partition path
+ * @throws Exception when error occurs for state update
+ */
+ private void loadRecords(String partitionPath) throws Exception {
+ HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+ List<HoodieBaseFile> latestBaseFiles =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+ for (HoodieBaseFile baseFile : latestBaseFiles) {
+ List<HoodieKey> hoodieKeys =
+ ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+ hoodieKeys.forEach(hoodieKey -> {
+ try {
+ this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+ } catch (Exception e) {
+ throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+ }
+ });
+ }
+ // Mark the partition path as loaded.
+ partitionLoadState.put(partitionPath, 0);
+ }
+
+ /**
+ * Checks whether all the partitions of the table are loaded into the state,
+ * set the flag {@code allPartitionsLoaded} to true if it is.
+ */
+ private void checkPartitionsLoaded() {
+ for (String partition : this.allPartitionPath) {
+ try {
+ if (!this.partitionLoadState.contains(partition)) {
+ return;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error when check whether all partitions are loaded, ignored", e);
+ throw new HoodieException(e);
+ }
+ }
+ this.allPartitionsLoaded = true;
+ }
Review comment:
IIUC, this seems necessary cause we didn't update the `partitionLoadState` if we see a new partition in the upcoming records so we need to check after each commit. Otherwise, we need to update the `partitionLoadState` with `indexState` together.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +130,14 @@ public BucketAssignFunction(Configuration conf) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
- HoodieFlinkEngineContext context =
- new HoodieFlinkEngineContext(
- new SerializableConfiguration(StreamerUtil.getHadoopConf()),
- new FlinkTaskContextSupplier(getRuntimeContext()));
- this.bucketAssigner = new BucketAssigner(
- context,
- writeConfig);
+ this.context = new HoodieFlinkEngineContext(
+ new SerializableConfiguration(StreamerUtil.getHadoopConf()),
Review comment:
can we use `this.hadoopConf`, `getHadoopConf()` seems called twice.
##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
*/
public class HoodieIndexUtils {
+ /**
+ * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+ *
+ * @param partition Partition of interest
+ * @param context Instance of {@link HoodieEngineContext} to use
+ * @param hoodieTable Instance of {@link HoodieTable} of interest
+ * @return the list of {@link HoodieBaseFile}
+ */
+ public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
Review comment:
shall we use this in `getLatestBaseFilesForAllPartitions` to avoid duplicate codes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579066469
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
+ if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
Review comment:
by the way, `allPartitionPath` only inited in `BucketAssignFunction#open` method, seems also need to update `allPartitionPath`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] hk-lrzy commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
hk-lrzy commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r581156455
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +131,20 @@ public BucketAssignFunction(Configuration conf) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
- HoodieFlinkEngineContext context =
- new HoodieFlinkEngineContext(
- new SerializableConfiguration(StreamerUtil.getHadoopConf()),
- new FlinkTaskContextSupplier(getRuntimeContext()));
- this.bucketAssigner = new BucketAssigner(
- context,
- writeConfig);
+ this.hadoopConf = StreamerUtil.getHadoopConf();
+ this.context = new HoodieFlinkEngineContext(
+ new SerializableConfiguration(this.hadoopConf),
+ new FlinkTaskContextSupplier(getRuntimeContext()));
+ this.bucketAssigner = new BucketAssigner(context, writeConfig);
+ List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
+ this.conf.getString(FlinkOptions.PATH), false, false, false);
+ final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+ final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+ final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+ // reference: org.apache.flink.streaming.api.datastream.KeyedStream
+ this.initialPartitionsToLoad = allPartitionPaths.stream()
Review comment:
```
if (context.isRestored()) {
checkPartitionsLoaded();
}
```
when restored from checkpoint, `initialPartitionsToLoad` has not initialized yet.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
> Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (af6d9b7) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **decrease** coverage by `41.45%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2581 +/- ##
============================================
- Coverage 51.14% 9.69% -41.46%
+ Complexity 3215 48 -3167
============================================
Files 438 53 -385
Lines 20041 1929 -18112
Branches 2064 230 -1834
============================================
- Hits 10250 187 -10063
+ Misses 8946 1729 -7217
+ Partials 845 13 -832
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `9.69% <ø> (-59.78%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| ... and [412 more](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
> Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (ba652e8) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **increase** coverage by `0.03%`.
> The diff coverage is `71.42%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2581 +/- ##
============================================
+ Coverage 51.14% 51.18% +0.03%
- Complexity 3215 3226 +11
============================================
Files 438 438
Lines 20041 20090 +49
Branches 2064 2069 +5
============================================
+ Hits 10250 10283 +33
- Misses 8946 8959 +13
- Partials 845 848 +3
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `36.87% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudicommon | `51.35% <ø> (-0.01%)` | `0.00 <ø> (ø)` | |
| hudiflink | `46.34% <71.42%> (+0.90%)` | `0.00 <10.00> (ø)` | |
| hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisparkdatasource | `69.75% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
| huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiutilities | `69.46% <ø> (ø)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | `55.66% <ø> (-0.44%)` | `38.00 <0.00> (ø)` | |
| [...udi/operator/partitioner/BucketAssignFunction.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25GdW5jdGlvbi5qYXZh) | `78.65% <70.90%> (-13.66%)` | `18.00 <9.00> (+10.00)` | :arrow_down: |
| [...ache/hudi/operator/partitioner/BucketAssigner.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25lci5qYXZh) | `80.17% <100.00%> (+0.17%)` | `19.00 <1.00> (+1.00)` | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r580040125
##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
*/
public class HoodieIndexUtils {
+ /**
+ * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+ *
+ * @param partition Partition of interest
+ * @param context Instance of {@link HoodieEngineContext} to use
+ * @param hoodieTable Instance of {@link HoodieTable} of interest
+ * @return the list of {@link HoodieBaseFile}
+ */
+ public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
Review comment:
refer to `SparkHoodieBackedTableMetadataWriter#prepRecords`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579014347
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
+ if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
Review comment:
here, use `&&`, the second statement will always be executed.
![image](https://user-images.githubusercontent.com/20113411/108479498-c6dbf500-72d0-11eb-819a-5c6e06a38d0f.png)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] yanghua merged pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
yanghua merged pull request #2581:
URL: https://github.com/apache/hudi/pull/2581
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579002272
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
+ if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
Review comment:
No, `allPartitionsLoaded` flag is used to speed up so that there is no need to query the state which is not very efficient.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] hk-lrzy commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
hk-lrzy commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r581156455
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +131,20 @@ public BucketAssignFunction(Configuration conf) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
- HoodieFlinkEngineContext context =
- new HoodieFlinkEngineContext(
- new SerializableConfiguration(StreamerUtil.getHadoopConf()),
- new FlinkTaskContextSupplier(getRuntimeContext()));
- this.bucketAssigner = new BucketAssigner(
- context,
- writeConfig);
+ this.hadoopConf = StreamerUtil.getHadoopConf();
+ this.context = new HoodieFlinkEngineContext(
+ new SerializableConfiguration(this.hadoopConf),
+ new FlinkTaskContextSupplier(getRuntimeContext()));
+ this.bucketAssigner = new BucketAssigner(context, writeConfig);
+ List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
+ this.conf.getString(FlinkOptions.PATH), false, false, false);
+ final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+ final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+ final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+ // reference: org.apache.flink.streaming.api.datastream.KeyedStream
+ this.initialPartitionsToLoad = allPartitionPaths.stream()
Review comment:
```
if (context.isRestored()) {
checkPartitionsLoaded();
}
```
when restored from checkpoint, `initialPartitionsToLoad` has not initialized
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783929502
Thanks @danny0405 for the base index patch, maybe there are some points to think about later, 👍
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] yanghua commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783806234
@danny0405 please check the CI?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783069192
> hi @danny0405 is it necessary to revert `this.bucketAssigner.reset();` to `BucketAssiginFunction#snapshotState` in this patch? : )
I will do it in another patch, thanks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
> Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (813fa19) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **increase** coverage by `0.03%`.
> The diff coverage is `72.00%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2581 +/- ##
============================================
+ Coverage 51.14% 51.18% +0.03%
- Complexity 3215 3226 +11
============================================
Files 438 438
Lines 20041 20084 +43
Branches 2064 2068 +4
============================================
+ Hits 10250 10279 +29
- Misses 8946 8958 +12
- Partials 845 847 +2
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `36.87% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudicommon | `51.35% <ø> (-0.01%)` | `0.00 <ø> (ø)` | |
| hudiflink | `46.25% <72.00%> (+0.81%)` | `0.00 <10.00> (ø)` | |
| hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisparkdatasource | `69.75% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
| huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiutilities | `69.46% <ø> (ø)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | `55.66% <ø> (-0.44%)` | `38.00 <0.00> (ø)` | |
| [...udi/operator/partitioner/BucketAssignFunction.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25GdW5jdGlvbi5qYXZh) | `79.51% <71.42%> (-12.79%)` | `18.00 <9.00> (+10.00)` | :arrow_down: |
| [...ache/hudi/operator/partitioner/BucketAssigner.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25lci5qYXZh) | `80.17% <100.00%> (+0.17%)` | `19.00 <1.00> (+1.00)` | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
> Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (813fa19) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **decrease** coverage by `41.45%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2581 +/- ##
============================================
- Coverage 51.14% 9.69% -41.46%
+ Complexity 3215 48 -3167
============================================
Files 438 53 -385
Lines 20041 1929 -18112
Branches 2064 230 -1834
============================================
- Hits 10250 187 -10063
+ Misses 8946 1729 -7217
+ Partials 845 13 -832
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `9.69% <ø> (-59.78%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| ... and [412 more](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
> Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (ba652e8) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **decrease** coverage by `1.83%`.
> The diff coverage is `71.42%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2581 +/- ##
============================================
- Coverage 51.14% 49.31% -1.84%
+ Complexity 3215 2848 -367
============================================
Files 438 381 -57
Lines 20041 17102 -2939
Branches 2064 1734 -330
============================================
- Hits 10250 8433 -1817
+ Misses 8946 8000 -946
+ Partials 845 669 -176
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `36.87% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudicommon | `51.35% <ø> (-0.01%)` | `0.00 <ø> (ø)` | |
| hudiflink | `46.34% <71.42%> (+0.90%)` | `0.00 <10.00> (ø)` | |
| hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `69.46% <ø> (ø)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | `55.66% <ø> (-0.44%)` | `38.00 <0.00> (ø)` | |
| [...udi/operator/partitioner/BucketAssignFunction.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25GdW5jdGlvbi5qYXZh) | `78.65% <70.90%> (-13.66%)` | `18.00 <9.00> (+10.00)` | :arrow_down: |
| [...ache/hudi/operator/partitioner/BucketAssigner.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25lci5qYXZh) | `80.17% <100.00%> (+0.17%)` | `19.00 <1.00> (+1.00)` | |
| [...udi/timeline/service/handlers/TimelineHandler.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS10aW1lbGluZS1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL3RpbWVsaW5lL3NlcnZpY2UvaGFuZGxlcnMvVGltZWxpbmVIYW5kbGVyLmphdmE=) | | | |
| [.../hive/SlashEncodedHourPartitionValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2xhc2hFbmNvZGVkSG91clBhcnRpdGlvblZhbHVlRXh0cmFjdG9yLmphdmE=) | | | |
| [.../hudi/internal/HoodieDataSourceInternalWriter.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3BhcmsyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2ludGVybmFsL0hvb2RpZURhdGFTb3VyY2VJbnRlcm5hbFdyaXRlci5qYXZh) | | | |
| [...main/java/org/apache/hudi/hive/HiveSyncConfig.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvSGl2ZVN5bmNDb25maWcuamF2YQ==) | | | |
| [...n/java/org/apache/hudi/internal/DefaultSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3BhcmsyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2ludGVybmFsL0RlZmF1bHRTb3VyY2UuamF2YQ==) | | | |
| [...i/hive/SlashEncodedDayPartitionValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2xhc2hFbmNvZGVkRGF5UGFydGl0aW9uVmFsdWVFeHRyYWN0b3IuamF2YQ==) | | | |
| [...i/internal/HoodieBulkInsertDataInternalWriter.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3BhcmsyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2ludGVybmFsL0hvb2RpZUJ1bGtJbnNlcnREYXRhSW50ZXJuYWxXcml0ZXIuamF2YQ==) | | | |
| ... and [50 more](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r580736398
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +131,22 @@ public BucketAssignFunction(Configuration conf) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
- HoodieFlinkEngineContext context =
- new HoodieFlinkEngineContext(
- new SerializableConfiguration(StreamerUtil.getHadoopConf()),
- new FlinkTaskContextSupplier(getRuntimeContext()));
- this.bucketAssigner = new BucketAssigner(
- context,
- writeConfig);
+ this.hadoopConf = StreamerUtil.getHadoopConf();
+ this.context = new HoodieFlinkEngineContext(
+ new SerializableConfiguration(this.hadoopConf),
+ new FlinkTaskContextSupplier(getRuntimeContext()));
+ this.bucketAssigner = new BucketAssigner(context, writeConfig);
+ List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
+ this.conf.getString(FlinkOptions.PATH), false, false, false);
+ final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+ final int maxParallelism = getRuntimeContext().getExecutionConfig().getMaxParallelism() == -1
+ ? KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM
+ : getRuntimeContext().getExecutionConfig().getMaxParallelism();
Review comment:
we can use `getRuntimeContext().getMaxNumberOfParallelSubtasks()`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579915210
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reset();
this.bucketAssigner.refreshTable();
+ checkPartitionsLoaded();
+ }
+
+ /**
+ * Load all the indices of give partition path into the backup state.
+ *
+ * @param partitionPath The partition path
+ * @throws Exception when error occurs for state update
+ */
+ private void loadRecords(String partitionPath) throws Exception {
+ HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+ List<HoodieBaseFile> latestBaseFiles =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+ for (HoodieBaseFile baseFile : latestBaseFiles) {
+ List<HoodieKey> hoodieKeys =
+ ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+ hoodieKeys.forEach(hoodieKey -> {
+ try {
+ this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+ } catch (Exception e) {
+ throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+ }
+ });
+ }
+ // Mark the partition path as loaded.
+ partitionLoadState.put(partitionPath, 0);
Review comment:
> `The 0 is meaningless here`
It is meaningless anyway, because Flink does not have Set state.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781192449
> @lamber-ken You can also help to review this PR, if you would like to do it.
Yeah. By the way, big thanks to @danny0405 for RFC-24 👍
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579017041
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reset();
this.bucketAssigner.refreshTable();
+ checkPartitionsLoaded();
+ }
+
+ /**
+ * Load all the indices of give partition path into the backup state.
+ *
+ * @param partitionPath The partition path
+ * @throws Exception when error occurs for state update
+ */
+ private void loadRecords(String partitionPath) throws Exception {
+ HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+ List<HoodieBaseFile> latestBaseFiles =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+ for (HoodieBaseFile baseFile : latestBaseFiles) {
+ List<HoodieKey> hoodieKeys =
+ ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+ hoodieKeys.forEach(hoodieKey -> {
+ try {
+ this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+ } catch (Exception e) {
+ throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+ }
+ });
+ }
+ // Mark the partition path as loaded.
+ partitionLoadState.put(partitionPath, 0);
Review comment:
The `0` is meaningless here, It's may not intuitive for beginners.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] hk-lrzy commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
hk-lrzy commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r581232576
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -100,6 +160,12 @@ public void initializeState(FunctionInitializationContext context) {
TypeInformation.of(HoodieKey.class),
TypeInformation.of(HoodieRecordLocation.class));
indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
+ MapStateDescriptor<String, Integer> partitionLoadStateDesc =
+ new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
+ partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
Review comment:
i remembered that `partitionLoadState` is a keystate rather than operatorstate, so can't access `initializeState` and `notifyCheckpointComplete`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579002846
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reset();
this.bucketAssigner.refreshTable();
+ checkPartitionsLoaded();
+ }
+
+ /**
+ * Load all the indices of give partition path into the backup state.
+ *
+ * @param partitionPath The partition path
+ * @throws Exception when error occurs for state update
+ */
+ private void loadRecords(String partitionPath) throws Exception {
+ HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+ List<HoodieBaseFile> latestBaseFiles =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+ for (HoodieBaseFile baseFile : latestBaseFiles) {
+ List<HoodieKey> hoodieKeys =
+ ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+ hoodieKeys.forEach(hoodieKey -> {
+ try {
+ this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+ } catch (Exception e) {
+ throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+ }
+ });
+ }
+ // Mark the partition path as loaded.
+ partitionLoadState.put(partitionPath, 0);
Review comment:
It is okey because only one code snippet uses it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] yanghua commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781141632
@lamber-ken You can also help to review this PR, if you would like to do it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r580010866
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
+ if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
Review comment:
We only need to ensure the initial partitions are loaded successfully, the new input data would trigger index update if there are new data partitions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r580038941
##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
*/
public class HoodieIndexUtils {
+ /**
+ * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+ *
+ * @param partition Partition of interest
+ * @param context Instance of {@link HoodieEngineContext} to use
+ * @param hoodieTable Instance of {@link HoodieTable} of interest
+ * @return the list of {@link HoodieBaseFile}
+ */
+ public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
Review comment:
It's good that `getLatestBaseFilesForPartition` was extracted from `getLatestBaseFilesForAllPartitions`.
Current codebase:
```
public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
final String partition,
final HoodieTable hoodieTable) {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
if (latestCommitTime.isPresent()) {
return hoodieTable.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partition, latestCommitTime.get().getTimestamp())
.collect(toList());
}
return Collections.emptyList();
}
```
Maybe the following implementation is more efficient
```
public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
final String partition,
final HoodieTable hoodieTable) {
return hoodieTable.getFileSystemView()
.getAllFileGroups(partition)
.map(HoodieFileGroup::getLatestDataFile)
.filter(Option::isPresent)
.map(Option::get)
.collect(toList());
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783819474
> @danny0405 please check the CI?
Should not be caused by this PR, re-trigger to run the tests again.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r580094188
##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
*/
public class HoodieIndexUtils {
+ /**
+ * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+ *
+ * @param partition Partition of interest
+ * @param context Instance of {@link HoodieEngineContext} to use
+ * @param hoodieTable Instance of {@link HoodieTable} of interest
+ * @return the list of {@link HoodieBaseFile}
+ */
+ public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
Review comment:
Agree, there is no need to decide and compare the instant time here, but i would not promote it in this PR, because it is not related.
You can promote it in a separate JIRA issue.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
> Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (ba652e8) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **increase** coverage by `0.03%`.
> The diff coverage is `71.42%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2581 +/- ##
============================================
+ Coverage 51.14% 51.18% +0.03%
- Complexity 3215 3226 +11
============================================
Files 438 438
Lines 20041 20090 +49
Branches 2064 2069 +5
============================================
+ Hits 10250 10283 +33
- Misses 8946 8959 +13
- Partials 845 848 +3
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `36.87% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudicommon | `51.35% <ø> (-0.01%)` | `0.00 <ø> (ø)` | |
| hudiflink | `46.34% <71.42%> (+0.90%)` | `0.00 <10.00> (ø)` | |
| hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisparkdatasource | `69.75% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
| huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
| hudiutilities | `69.46% <ø> (ø)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | `55.66% <ø> (-0.44%)` | `38.00 <0.00> (ø)` | |
| [...udi/operator/partitioner/BucketAssignFunction.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25GdW5jdGlvbi5qYXZh) | `78.65% <70.90%> (-13.66%)` | `18.00 <9.00> (+10.00)` | :arrow_down: |
| [...ache/hudi/operator/partitioner/BucketAssigner.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25lci5qYXZh) | `80.17% <100.00%> (+0.17%)` | `19.00 <1.00> (+1.00)` | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r581553993
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +131,20 @@ public BucketAssignFunction(Configuration conf) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
- HoodieFlinkEngineContext context =
- new HoodieFlinkEngineContext(
- new SerializableConfiguration(StreamerUtil.getHadoopConf()),
- new FlinkTaskContextSupplier(getRuntimeContext()));
- this.bucketAssigner = new BucketAssigner(
- context,
- writeConfig);
+ this.hadoopConf = StreamerUtil.getHadoopConf();
+ this.context = new HoodieFlinkEngineContext(
+ new SerializableConfiguration(this.hadoopConf),
+ new FlinkTaskContextSupplier(getRuntimeContext()));
+ this.bucketAssigner = new BucketAssigner(context, writeConfig);
+ List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
+ this.conf.getString(FlinkOptions.PATH), false, false, false);
+ final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+ final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+ final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+ // reference: org.apache.flink.streaming.api.datastream.KeyedStream
+ this.initialPartitionsToLoad = allPartitionPaths.stream()
Review comment:
Yes welcome to fire a fix and add test cases
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783951593
> Thanks @danny0405 for the base index patch, maybe there are some points to think about later, 👍
Thanks for the new ideas if you have some and welcome the contribution ~
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579977745
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +130,14 @@ public BucketAssignFunction(Configuration conf) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
- HoodieFlinkEngineContext context =
- new HoodieFlinkEngineContext(
- new SerializableConfiguration(StreamerUtil.getHadoopConf()),
- new FlinkTaskContextSupplier(getRuntimeContext()));
- this.bucketAssigner = new BucketAssigner(
- context,
- writeConfig);
+ this.hadoopConf = StreamerUtil.getHadoopConf();
+ this.context = new HoodieFlinkEngineContext(
+ new SerializableConfiguration(this.hadoopConf),
+ new FlinkTaskContextSupplier(getRuntimeContext()));
+ this.bucketAssigner = new BucketAssigner(context, writeConfig);
+ final FileSystem fs = FSUtils.getFs(this.conf.getString(FlinkOptions.PATH), this.hadoopConf);
Review comment:
Already removed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r578893011
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reset();
this.bucketAssigner.refreshTable();
+ checkPartitionsLoaded();
+ }
+
+ /**
+ * Load all the indices of give partition path into the backup state.
+ *
+ * @param partitionPath The partition path
+ * @throws Exception when error occurs for state update
+ */
+ private void loadRecords(String partitionPath) throws Exception {
+ HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+ List<HoodieBaseFile> latestBaseFiles =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+ for (HoodieBaseFile baseFile : latestBaseFiles) {
+ List<HoodieKey> hoodieKeys =
+ ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+ hoodieKeys.forEach(hoodieKey -> {
+ try {
+ this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+ } catch (Exception e) {
+ throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+ }
+ });
+ }
+ // Mark the partition path as loaded.
+ partitionLoadState.put(partitionPath, 0);
Review comment:
`0`, It is better to use static constants instead.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
+ if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
Review comment:
The `allPartitionsLoaded` member variable seems redundant, can we only use `partitionLoadState`?
##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reset();
this.bucketAssigner.refreshTable();
+ checkPartitionsLoaded();
+ }
+
+ /**
+ * Load all the indices of give partition path into the backup state.
+ *
+ * @param partitionPath The partition path
+ * @throws Exception when error occurs for state update
+ */
+ private void loadRecords(String partitionPath) throws Exception {
+ HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+ List<HoodieBaseFile> latestBaseFiles =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+ for (HoodieBaseFile baseFile : latestBaseFiles) {
+ List<HoodieKey> hoodieKeys =
+ ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+ hoodieKeys.forEach(hoodieKey -> {
+ try {
+ this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+ } catch (Exception e) {
+ throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+ }
+ });
+ }
+ // Mark the partition path as loaded.
+ partitionLoadState.put(partitionPath, 0);
+ }
+
+ /**
+ * Checks whether all the partitions of the table are loaded into the state,
+ * set the flag {@code allPartitionsLoaded} to true if it is.
+ */
+ private void checkPartitionsLoaded() {
+ for (String partition : this.allPartitionPath) {
+ try {
+ if (!this.partitionLoadState.contains(partition)) {
+ return;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error when check whether all partitions are loaded, ignored", e);
+ throw new HoodieException(e);
+ }
+ }
+ this.allPartitionsLoaded = true;
+ }
Review comment:
`checkPartitionsLoaded()` method seems redundant as `allPartitionsLoaded`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-io commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…
Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554
# [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
> Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (0ebd0c7) into [master](https://codecov.io/gh/apache/hudi/commit/b0010bf3b449a9d2e01955b0746b795e22e577db?el=desc) (b0010bf) will **decrease** coverage by `41.46%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #2581 +/- ##
============================================
- Coverage 51.15% 9.68% -41.47%
+ Complexity 3212 48 -3164
============================================
Files 436 53 -383
Lines 19987 1931 -18056
Branches 2057 230 -1827
============================================
- Hits 10224 187 -10037
+ Misses 8922 1731 -7191
+ Partials 841 13 -828
```
| Flag | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| hudicli | `?` | `?` | |
| hudiclient | `?` | `?` | |
| hudicommon | `?` | `?` | |
| hudiflink | `?` | `?` | |
| hudihadoopmr | `?` | `?` | |
| hudisparkdatasource | `?` | `?` | |
| hudisync | `?` | `?` | |
| huditimelineservice | `?` | `?` | |
| hudiutilities | `9.68% <ø> (-59.71%)` | `0.00 <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
| ... and [410 more](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree-more) | |
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org