You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/18 06:13:58 UTC

[GitHub] [hudi] danny0405 opened a new pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

danny0405 opened a new pull request #2581:
URL: https://github.com/apache/hudi/pull/2581


   … files
   
   ## What is the purpose of the pull request
   
   The index should bootstrap from existing base files if there are, in the
   design, we load all the keys for one partition if we found that the key
   does not exist in the index for `processElement`, if there are many
   records for this partition, the processing may block and trigger back
   pressure. When all the records are loaded, we only need to check the
   state each time a record is tagged.
   
   ## Brief change log
   
     - Modify BucketAssignFunction to load existing records for indexing
   
   ## Verify this pull request
   
   Added UTs.
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579947920



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
  */
 public class HoodieIndexUtils {
 
+  /**
+   * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+   *
+   * @param partition   Partition of interest
+   * @param context     Instance of {@link HoodieEngineContext} to use
+   * @param hoodieTable Instance of {@link HoodieTable} of interest
+   * @return the list of {@link HoodieBaseFile}
+   */
+  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(

Review comment:
       I think we could.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783053827


   hi @danny0405 is it necessary to revert `this.bucketAssigner.reset();` to `BucketAssiginFunction#snapshotState` in this patch? : )


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579964461



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +130,14 @@ public BucketAssignFunction(Configuration conf) {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-    this.bucketAssigner = new BucketAssigner(
-        context,
-        writeConfig);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+    this.context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(this.hadoopConf),
+        new FlinkTaskContextSupplier(getRuntimeContext()));
+    this.bucketAssigner = new BucketAssigner(context, writeConfig);
+    final FileSystem fs = FSUtils.getFs(this.conf.getString(FlinkOptions.PATH), this.hadoopConf);

Review comment:
       `fs` never used, we can remove it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579779589



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);

Review comment:
       maybe put a boolean?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);
+  }
+
+  /**
+   * Checks whether all the partitions of the table are loaded into the state,
+   * set the flag {@code allPartitionsLoaded} to true if it is.
+   */
+  private void checkPartitionsLoaded() {
+    for (String partition : this.allPartitionPath) {
+      try {
+        if (!this.partitionLoadState.contains(partition)) {
+          return;
+        }
+      } catch (Exception e) {
+        LOG.warn("Error when check whether all partitions are loaded, ignored", e);
+        throw new HoodieException(e);
+      }
+    }
+    this.allPartitionsLoaded = true;
+  }

Review comment:
       IIUC, this seems necessary cause we didn't update the `partitionLoadState` if we see a new partition in the upcoming records so we need to check after each commit. Otherwise, we need to update the `partitionLoadState` with `indexState` together.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +130,14 @@ public BucketAssignFunction(Configuration conf) {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-    this.bucketAssigner = new BucketAssigner(
-        context,
-        writeConfig);
+    this.context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(StreamerUtil.getHadoopConf()),

Review comment:
       can we use `this.hadoopConf`, `getHadoopConf()` seems called twice. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
  */
 public class HoodieIndexUtils {
 
+  /**
+   * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+   *
+   * @param partition   Partition of interest
+   * @param context     Instance of {@link HoodieEngineContext} to use
+   * @param hoodieTable Instance of {@link HoodieTable} of interest
+   * @return the list of {@link HoodieBaseFile}
+   */
+  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(

Review comment:
       shall we use this in `getLatestBaseFilesForAllPartitions` to avoid duplicate codes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579066469



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
     final HoodieKey hoodieKey = record.getKey();
     final BucketInfo bucketInfo;
     final HoodieRecordLocation location;
+    if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {

Review comment:
       by the way, `allPartitionPath` only inited in `BucketAssignFunction#open` method, seems also need to update `allPartitionPath`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hk-lrzy commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
hk-lrzy commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r581156455



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +131,20 @@ public BucketAssignFunction(Configuration conf) {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-    this.bucketAssigner = new BucketAssigner(
-        context,
-        writeConfig);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+    this.context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(this.hadoopConf),
+        new FlinkTaskContextSupplier(getRuntimeContext()));
+    this.bucketAssigner = new BucketAssigner(context, writeConfig);
+    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
+        this.conf.getString(FlinkOptions.PATH), false, false, false);
+    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+    final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+    final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+    // reference: org.apache.flink.streaming.api.datastream.KeyedStream
+    this.initialPartitionsToLoad = allPartitionPaths.stream()

Review comment:
       ```
       if (context.isRestored()) {
         checkPartitionsLoaded();
       }
   ```
   when restored from checkpoint, `initialPartitionsToLoad` has not initialized yet.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
   > Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (af6d9b7) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **decrease** coverage by `41.45%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2581       +/-   ##
   ============================================
   - Coverage     51.14%   9.69%   -41.46%     
   + Complexity     3215      48     -3167     
   ============================================
     Files           438      53      -385     
     Lines         20041    1929    -18112     
     Branches       2064     230     -1834     
   ============================================
   - Hits          10250     187    -10063     
   + Misses         8946    1729     -7217     
   + Partials        845      13      -832     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.69% <ø> (-59.78%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [412 more](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
   > Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (ba652e8) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **increase** coverage by `0.03%`.
   > The diff coverage is `71.42%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2581      +/-   ##
   ============================================
   + Coverage     51.14%   51.18%   +0.03%     
   - Complexity     3215     3226      +11     
   ============================================
     Files           438      438              
     Lines         20041    20090      +49     
     Branches       2064     2069       +5     
   ============================================
   + Hits          10250    10283      +33     
   - Misses         8946     8959      +13     
   - Partials        845      848       +3     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `36.87% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.35% <ø> (-0.01%)` | `0.00 <ø> (ø)` | |
   | hudiflink | `46.34% <71.42%> (+0.90%)` | `0.00 <10.00> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `69.75% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiutilities | `69.46% <ø> (ø)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | `55.66% <ø> (-0.44%)` | `38.00 <0.00> (ø)` | |
   | [...udi/operator/partitioner/BucketAssignFunction.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25GdW5jdGlvbi5qYXZh) | `78.65% <70.90%> (-13.66%)` | `18.00 <9.00> (+10.00)` | :arrow_down: |
   | [...ache/hudi/operator/partitioner/BucketAssigner.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25lci5qYXZh) | `80.17% <100.00%> (+0.17%)` | `19.00 <1.00> (+1.00)` | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r580040125



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
  */
 public class HoodieIndexUtils {
 
+  /**
+   * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+   *
+   * @param partition   Partition of interest
+   * @param context     Instance of {@link HoodieEngineContext} to use
+   * @param hoodieTable Instance of {@link HoodieTable} of interest
+   * @return the list of {@link HoodieBaseFile}
+   */
+  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(

Review comment:
       refer to `SparkHoodieBackedTableMetadataWriter#prepRecords`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579014347



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
     final HoodieKey hoodieKey = record.getKey();
     final BucketInfo bucketInfo;
     final HoodieRecordLocation location;
+    if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {

Review comment:
       here, use `&&`, the second statement will always be executed.
   
   ![image](https://user-images.githubusercontent.com/20113411/108479498-c6dbf500-72d0-11eb-819a-5c6e06a38d0f.png)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua merged pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
yanghua merged pull request #2581:
URL: https://github.com/apache/hudi/pull/2581


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579002272



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
     final HoodieKey hoodieKey = record.getKey();
     final BucketInfo bucketInfo;
     final HoodieRecordLocation location;
+    if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {

Review comment:
       No, `allPartitionsLoaded` flag is used to speed up so that there is no need to query the state which is not very efficient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hk-lrzy commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
hk-lrzy commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r581156455



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +131,20 @@ public BucketAssignFunction(Configuration conf) {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-    this.bucketAssigner = new BucketAssigner(
-        context,
-        writeConfig);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+    this.context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(this.hadoopConf),
+        new FlinkTaskContextSupplier(getRuntimeContext()));
+    this.bucketAssigner = new BucketAssigner(context, writeConfig);
+    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
+        this.conf.getString(FlinkOptions.PATH), false, false, false);
+    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+    final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+    final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+    // reference: org.apache.flink.streaming.api.datastream.KeyedStream
+    this.initialPartitionsToLoad = allPartitionPaths.stream()

Review comment:
       ```
       if (context.isRestored()) {
         checkPartitionsLoaded();
       }
   ```
   when restored from checkpoint, `initialPartitionsToLoad` has not initialized




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783929502


   Thanks @danny0405 for the base index patch, maybe there are some points to think about later, 👍 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783806234


   @danny0405 please check the CI?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783069192


   > hi @danny0405 is it necessary to revert `this.bucketAssigner.reset();` to `BucketAssiginFunction#snapshotState` in this patch? : )
   
   I will do it in another patch, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
   > Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (813fa19) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **increase** coverage by `0.03%`.
   > The diff coverage is `72.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2581      +/-   ##
   ============================================
   + Coverage     51.14%   51.18%   +0.03%     
   - Complexity     3215     3226      +11     
   ============================================
     Files           438      438              
     Lines         20041    20084      +43     
     Branches       2064     2068       +4     
   ============================================
   + Hits          10250    10279      +29     
   - Misses         8946     8958      +12     
   - Partials        845      847       +2     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `36.87% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.35% <ø> (-0.01%)` | `0.00 <ø> (ø)` | |
   | hudiflink | `46.25% <72.00%> (+0.81%)` | `0.00 <10.00> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `69.75% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiutilities | `69.46% <ø> (ø)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | `55.66% <ø> (-0.44%)` | `38.00 <0.00> (ø)` | |
   | [...udi/operator/partitioner/BucketAssignFunction.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25GdW5jdGlvbi5qYXZh) | `79.51% <71.42%> (-12.79%)` | `18.00 <9.00> (+10.00)` | :arrow_down: |
   | [...ache/hudi/operator/partitioner/BucketAssigner.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25lci5qYXZh) | `80.17% <100.00%> (+0.17%)` | `19.00 <1.00> (+1.00)` | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
   > Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (813fa19) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **decrease** coverage by `41.45%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2581       +/-   ##
   ============================================
   - Coverage     51.14%   9.69%   -41.46%     
   + Complexity     3215      48     -3167     
   ============================================
     Files           438      53      -385     
     Lines         20041    1929    -18112     
     Branches       2064     230     -1834     
   ============================================
   - Hits          10250     187    -10063     
   + Misses         8946    1729     -7217     
   + Partials        845      13      -832     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.69% <ø> (-59.78%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [412 more](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
   > Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (ba652e8) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **decrease** coverage by `1.83%`.
   > The diff coverage is `71.42%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2581      +/-   ##
   ============================================
   - Coverage     51.14%   49.31%   -1.84%     
   + Complexity     3215     2848     -367     
   ============================================
     Files           438      381      -57     
     Lines         20041    17102    -2939     
     Branches       2064     1734     -330     
   ============================================
   - Hits          10250     8433    -1817     
   + Misses         8946     8000     -946     
   + Partials        845      669     -176     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `36.87% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.35% <ø> (-0.01%)` | `0.00 <ø> (ø)` | |
   | hudiflink | `46.34% <71.42%> (+0.90%)` | `0.00 <10.00> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `69.46% <ø> (ø)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | `55.66% <ø> (-0.44%)` | `38.00 <0.00> (ø)` | |
   | [...udi/operator/partitioner/BucketAssignFunction.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25GdW5jdGlvbi5qYXZh) | `78.65% <70.90%> (-13.66%)` | `18.00 <9.00> (+10.00)` | :arrow_down: |
   | [...ache/hudi/operator/partitioner/BucketAssigner.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25lci5qYXZh) | `80.17% <100.00%> (+0.17%)` | `19.00 <1.00> (+1.00)` | |
   | [...udi/timeline/service/handlers/TimelineHandler.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS10aW1lbGluZS1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL3RpbWVsaW5lL3NlcnZpY2UvaGFuZGxlcnMvVGltZWxpbmVIYW5kbGVyLmphdmE=) | | | |
   | [.../hive/SlashEncodedHourPartitionValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2xhc2hFbmNvZGVkSG91clBhcnRpdGlvblZhbHVlRXh0cmFjdG9yLmphdmE=) | | | |
   | [.../hudi/internal/HoodieDataSourceInternalWriter.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3BhcmsyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2ludGVybmFsL0hvb2RpZURhdGFTb3VyY2VJbnRlcm5hbFdyaXRlci5qYXZh) | | | |
   | [...main/java/org/apache/hudi/hive/HiveSyncConfig.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvSGl2ZVN5bmNDb25maWcuamF2YQ==) | | | |
   | [...n/java/org/apache/hudi/internal/DefaultSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3BhcmsyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2ludGVybmFsL0RlZmF1bHRTb3VyY2UuamF2YQ==) | | | |
   | [...i/hive/SlashEncodedDayPartitionValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvU2xhc2hFbmNvZGVkRGF5UGFydGl0aW9uVmFsdWVFeHRyYWN0b3IuamF2YQ==) | | | |
   | [...i/internal/HoodieBulkInsertDataInternalWriter.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3BhcmsyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2ludGVybmFsL0hvb2RpZUJ1bGtJbnNlcnREYXRhSW50ZXJuYWxXcml0ZXIuamF2YQ==) | | | |
   | ... and [50 more](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r580736398



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +131,22 @@ public BucketAssignFunction(Configuration conf) {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-    this.bucketAssigner = new BucketAssigner(
-        context,
-        writeConfig);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+    this.context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(this.hadoopConf),
+        new FlinkTaskContextSupplier(getRuntimeContext()));
+    this.bucketAssigner = new BucketAssigner(context, writeConfig);
+    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
+        this.conf.getString(FlinkOptions.PATH), false, false, false);
+    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+    final int maxParallelism = getRuntimeContext().getExecutionConfig().getMaxParallelism() == -1
+        ? KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM
+        : getRuntimeContext().getExecutionConfig().getMaxParallelism();

Review comment:
       we can use `getRuntimeContext().getMaxNumberOfParallelSubtasks()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579915210



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);

Review comment:
       > `The 0 is meaningless here`
   
   It is meaningless anyway, because Flink does not have Set state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781192449


   > @lamber-ken You can also help to review this PR, if you would like to do it.
   
   Yeah. By the way, big thanks to @danny0405 for RFC-24 👍 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579017041



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);

Review comment:
       The `0` is meaningless here, It's may not intuitive for beginners.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hk-lrzy commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
hk-lrzy commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r581232576



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -100,6 +160,12 @@ public void initializeState(FunctionInitializationContext context) {
             TypeInformation.of(HoodieKey.class),
             TypeInformation.of(HoodieRecordLocation.class));
     indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
+    MapStateDescriptor<String, Integer> partitionLoadStateDesc =
+        new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
+    partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);

Review comment:
       i remembered that `partitionLoadState` is a keystate rather than operatorstate, so can't access `initializeState` and `notifyCheckpointComplete`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579002846



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);

Review comment:
       It is okey because only one code snippet uses it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781141632


   @lamber-ken You can also help to review this PR, if you would like to do it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r580010866



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
     final HoodieKey hoodieKey = record.getKey();
     final BucketInfo bucketInfo;
     final HoodieRecordLocation location;
+    if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {

Review comment:
       We only need to ensure the initial partitions are loaded successfully, the new input data would trigger index update if there are new data partitions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r580038941



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
  */
 public class HoodieIndexUtils {
 
+  /**
+   * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+   *
+   * @param partition   Partition of interest
+   * @param context     Instance of {@link HoodieEngineContext} to use
+   * @param hoodieTable Instance of {@link HoodieTable} of interest
+   * @return the list of {@link HoodieBaseFile}
+   */
+  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(

Review comment:
       It's good that `getLatestBaseFilesForPartition` was extracted from `getLatestBaseFilesForAllPartitions`.  
   
   Current codebase: 
   ```
     public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
         final String partition,
         final HoodieTable hoodieTable) {
       Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
           .filterCompletedInstants().lastInstant();
       if (latestCommitTime.isPresent()) {
         return hoodieTable.getBaseFileOnlyView()
             .getLatestBaseFilesBeforeOrOn(partition, latestCommitTime.get().getTimestamp())
             .collect(toList());
       }
       return Collections.emptyList();
     }
   ```
   
   Maybe the following implementation is more efficient
   ```
     public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
         final String partition,
         final HoodieTable hoodieTable) {
       return hoodieTable.getFileSystemView()
           .getAllFileGroups(partition)
           .map(HoodieFileGroup::getLatestDataFile)
           .filter(Option::isPresent)
           .map(Option::get)
           .collect(toList());
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783819474


   > @danny0405 please check the CI?
   
   Should not be caused by this PR, re-trigger to run the tests again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r580094188



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
  */
 public class HoodieIndexUtils {
 
+  /**
+   * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+   *
+   * @param partition   Partition of interest
+   * @param context     Instance of {@link HoodieEngineContext} to use
+   * @param hoodieTable Instance of {@link HoodieTable} of interest
+   * @return the list of {@link HoodieBaseFile}
+   */
+  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(

Review comment:
       Agree, there is no need to decide and compare the instant time here, but i would not promote it in this PR, because it is not related.
   
   You can promote it in a separate JIRA issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
   > Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (ba652e8) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **increase** coverage by `0.03%`.
   > The diff coverage is `71.42%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2581      +/-   ##
   ============================================
   + Coverage     51.14%   51.18%   +0.03%     
   - Complexity     3215     3226      +11     
   ============================================
     Files           438      438              
     Lines         20041    20090      +49     
     Branches       2064     2069       +5     
   ============================================
   + Hits          10250    10283      +33     
   - Misses         8946     8959      +13     
   - Partials        845      848       +3     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `36.87% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.35% <ø> (-0.01%)` | `0.00 <ø> (ø)` | |
   | hudiflink | `46.34% <71.42%> (+0.90%)` | `0.00 <10.00> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `69.75% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiutilities | `69.46% <ø> (ø)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | `55.66% <ø> (-0.44%)` | `38.00 <0.00> (ø)` | |
   | [...udi/operator/partitioner/BucketAssignFunction.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25GdW5jdGlvbi5qYXZh) | `78.65% <70.90%> (-13.66%)` | `18.00 <9.00> (+10.00)` | :arrow_down: |
   | [...ache/hudi/operator/partitioner/BucketAssigner.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9wYXJ0aXRpb25lci9CdWNrZXRBc3NpZ25lci5qYXZh) | `80.17% <100.00%> (+0.17%)` | `19.00 <1.00> (+1.00)` | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r581553993



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +131,20 @@ public BucketAssignFunction(Configuration conf) {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-    this.bucketAssigner = new BucketAssigner(
-        context,
-        writeConfig);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+    this.context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(this.hadoopConf),
+        new FlinkTaskContextSupplier(getRuntimeContext()));
+    this.bucketAssigner = new BucketAssigner(context, writeConfig);
+    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
+        this.conf.getString(FlinkOptions.PATH), false, false, false);
+    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+    final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+    final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+    // reference: org.apache.flink.streaming.api.datastream.KeyedStream
+    this.initialPartitionsToLoad = allPartitionPaths.stream()

Review comment:
       Yes welcome to fire a fix and add test cases




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-783951593


   > Thanks @danny0405 for the base index patch, maybe there are some points to think about later, 👍
   
   Thanks for the new ideas if you have some and welcome the contribution ~


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579977745



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +130,14 @@ public BucketAssignFunction(Configuration conf) {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-    this.bucketAssigner = new BucketAssigner(
-        context,
-        writeConfig);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+    this.context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(this.hadoopConf),
+        new FlinkTaskContextSupplier(getRuntimeContext()));
+    this.bucketAssigner = new BucketAssigner(context, writeConfig);
+    final FileSystem fs = FSUtils.getFs(this.conf.getString(FlinkOptions.PATH), this.hadoopConf);

Review comment:
       Already removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r578893011



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);

Review comment:
       `0`, It is better to use static constants instead.
   
   

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
     final HoodieKey hoodieKey = record.getKey();
     final BucketInfo bucketInfo;
     final HoodieRecordLocation location;
+    if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {

Review comment:
       The `allPartitionsLoaded` member variable seems redundant, can we only use `partitionLoadState`? 

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);
+  }
+
+  /**
+   * Checks whether all the partitions of the table are loaded into the state,
+   * set the flag {@code allPartitionsLoaded} to true if it is.
+   */
+  private void checkPartitionsLoaded() {
+    for (String partition : this.allPartitionPath) {
+      try {
+        if (!this.partitionLoadState.contains(partition)) {
+          return;
+        }
+      } catch (Exception e) {
+        LOG.warn("Error when check whether all partitions are loaded, ignored", e);
+        throw new HoodieException(e);
+      }
+    }
+    this.allPartitionsLoaded = true;
+  }

Review comment:
       `checkPartitionsLoaded()` method seems redundant as `allPartitionsLoaded`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io commented on pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#issuecomment-781103554


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=h1) Report
   > Merging [#2581](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=desc) (0ebd0c7) into [master](https://codecov.io/gh/apache/hudi/commit/b0010bf3b449a9d2e01955b0746b795e22e577db?el=desc) (b0010bf) will **decrease** coverage by `41.46%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2581/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2581       +/-   ##
   ============================================
   - Coverage     51.15%   9.68%   -41.47%     
   + Complexity     3212      48     -3164     
   ============================================
     Files           436      53      -383     
     Lines         19987    1931    -18056     
     Branches       2057     230     -1827     
   ============================================
   - Hits          10224     187    -10037     
   + Misses         8922    1731     -7191     
   + Partials        841      13      -828     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.68% <ø> (-59.71%)` | `0.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2581?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | ... and [410 more](https://codecov.io/gh/apache/hudi/pull/2581/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org