You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/19 03:05:19 UTC

[GitHub] [hudi] lamber-ken commented on a change in pull request #2581: [HUDI-1624] The state based index should bootstrap from existing base…

lamber-ken commented on a change in pull request #2581:
URL: https://github.com/apache/hudi/pull/2581#discussion_r578893011



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);

Review comment:
       `0`, It is better to use static constants instead.
   
   

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -112,6 +171,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
     final HoodieKey hoodieKey = record.getKey();
     final BucketInfo bucketInfo;
     final HoodieRecordLocation location;
+    if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {

Review comment:
       The `allPartitionsLoaded` member variable seems redundant, can we only use `partitionLoadState`? 

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
##########
@@ -146,5 +209,69 @@ public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
+    checkPartitionsLoaded();
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   * @throws Exception when error occurs for state update
+   */
+  private void loadRecords(String partitionPath) throws Exception {
+    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
+    List<HoodieBaseFile> latestBaseFiles =
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, context, hoodieTable);
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      List<HoodieKey> hoodieKeys =
+          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      hoodieKeys.forEach(hoodieKey -> {
+        try {
+          this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+        } catch (Exception e) {
+          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+        }
+      });
+    }
+    // Mark the partition path as loaded.
+    partitionLoadState.put(partitionPath, 0);
+  }
+
+  /**
+   * Checks whether all the partitions of the table are loaded into the state,
+   * set the flag {@code allPartitionsLoaded} to true if it is.
+   */
+  private void checkPartitionsLoaded() {
+    for (String partition : this.allPartitionPath) {
+      try {
+        if (!this.partitionLoadState.contains(partition)) {
+          return;
+        }
+      } catch (Exception e) {
+        LOG.warn("Error when check whether all partitions are loaded, ignored", e);
+        throw new HoodieException(e);
+      }
+    }
+    this.allPartitionsLoaded = true;
+  }

Review comment:
       `checkPartitionsLoaded()` method seems redundant as `allPartitionsLoaded`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org