You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/21 09:58:27 UTC

[GitHub] [hudi] garyli1019 commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

garyli1019 commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r579779589



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);

Review comment:
       maybe put a boolean?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);
+  }
+
+  /**
+   * Checks whether all the partitions of the table are loaded into the state,
+   * set the flag {@code allPartitionsLoaded} to true if it is.
+   */
+  private void checkPartitionsLoaded() {
+    for (String partition : this.allPartitionPath) {
+      try {
+        if (!this.partitionLoadState.contains(partition)) {
+          return;
+        }
+      } catch (Exception e) {
+        LOG.warn("Error when check whether all partitions are loaded, ignored", e);
+        throw new HoodieException(e);
+      }
+    }
+    this.allPartitionsLoaded = true;
+  }

Review comment:
       IIUC, this seems necessary cause we didn't update the `partitionLoadState` if we see a new partition in the upcoming records so we need to check after each commit. Otherwise, we need to update the `partitionLoadState` with `indexState` together.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -78,13 +130,14 @@ public BucketAssignFunction(Configuration conf) {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-    this.bucketAssigner = new BucketAssigner(
-        context,
-        writeConfig);
+    this.context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(StreamerUtil.getHadoopConf()),

Review comment:
       can we use `this.hadoopConf`, `getHadoopConf()` seems called twice. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -37,6 +38,29 @@
  */
 public class HoodieIndexUtils {
 
+  /**
+   * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+   *
+   * @param partition   Partition of interest
+   * @param context     Instance of {@link HoodieEngineContext} to use
+   * @param hoodieTable Instance of {@link HoodieTable} of interest
+   * @return the list of {@link HoodieBaseFile}
+   */
+  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(

Review comment:
       shall we use this in `getLatestBaseFilesForAllPartitions` to avoid duplicate codes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org