You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/19 04:14:17 UTC

[GitHub] [hudi] codope commented on a change in pull request #5070: [HUDI-3663] Fixing Column Stats index to properly handle first Data Table commit

codope commented on a change in pull request #5070:
URL: https://github.com/apache/hudi/pull/5070#discussion_r830436453



##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -838,55 +857,52 @@ public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient
   public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
                                                                              HoodieEngineContext engineContext,
                                                                              MetadataRecordsGenerationParams recordsGenerationParams) {
-    try {
-      List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
-          .flatMap(entry -> entry.stream()).collect(Collectors.toList());
-      return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, allWriteStats, recordsGenerationParams);
-    } catch (Exception e) {
-      throw new HoodieException("Failed to generate column stats records for metadata table ", e);
-    }
-  }
+    List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
 
-  /**
-   * Create column stats from write status.
-   *
-   * @param engineContext           - Engine context
-   * @param allWriteStats           - Write status to convert
-   * @param recordsGenerationParams - Parameters for columns stats record generation
-   */
-  public static HoodieData<HoodieRecord> createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
-                                                                         List<HoodieWriteStat> allWriteStats,
-                                                                         MetadataRecordsGenerationParams recordsGenerationParams) {
     if (allWriteStats.isEmpty()) {
       return engineContext.emptyHoodieData();
     }
-    final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled());
-    final int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
-    HoodieData<HoodieWriteStat> allWriteStatsRDD = engineContext.parallelize(allWriteStats, parallelism);
-    return allWriteStatsRDD.flatMap(writeStat -> translateWriteStatToColumnStats(writeStat, recordsGenerationParams.getDataMetaClient(), columnsToIndex).iterator());
+
+    try {
+      Option<Schema> writerSchema =
+          Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+              .map(writerSchemaStr -> new Schema.Parser().parse(writerSchemaStr));
+
+      HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
+
+      List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
+          dataTableMetaClient.getTableConfig(), writerSchema);
+
+      if (columnsToIndex.isEmpty()) {
+        // In case there are no columns to index, bail
+        return engineContext.emptyHoodieData();
+      }
+
+      int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+      return engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat ->
+              translateWriteStatToColumnStats(writeStat, dataTableMetaClient, columnsToIndex).iterator());
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate column stats records for metadata table", e);
+    }
   }
 
   /**
    * Get the latest columns for the table for column stats indexing.
-   *
-   * @param datasetMetaClient                   - Data table meta client
-   * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing enabled for all columns
    */
-  private static List<String> getColumnsToIndex(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
-    if (!isMetaIndexColumnStatsForAllColumns
-        || datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() < 1) {
-      return Arrays.asList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp().split(","));
+  private static List<String> getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams,
+                                                HoodieTableConfig tableConfig,
+                                                Option<Schema> writerSchema) {
+    if (!recordsGenParams.isAllColumnStatsIndexEnabled()) {
+      // TODO why are we only indexing primary key? revisit fallback

Review comment:
       This will change as we'll be adding support to index multiple columns: https://github.com/apache/hudi/pull/4693/files#diff-11e9ef6bd53ef1001b669a1dc68dde2aba9b33c9eb72cc1e4198750336d79772
   No change needed from your side. I'll take care of it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org