You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/18 10:57:21 UTC

[GitHub] [hudi] xushiyan commented on a change in pull request #4847: [HUDI-3042] Refactoring clustering executors

xushiyan commented on a change in pull request #4847:
URL: https://github.com/apache/hudi/pull/4847#discussion_r809894233



##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -18,111 +18,48 @@
 
 package org.apache.hudi.table.action.cluster;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
-import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
 
-import org.apache.avro.Schema;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private static final Logger LOG = LogManager.getLogger(SparkExecuteClusteringCommitActionExecutor.class);
   private final HoodieClusteringPlan clusteringPlan;
 
   public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
                                                     HoodieWriteConfig config, HoodieTable table,
                                                     String instantTime) {
     super(context, config, table, instantTime, WriteOperationType.CLUSTER);
-    this.clusteringPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
-      .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException("Unable to read clustering plan for instant: " + instantTime));
+    this.clusteringPlan = ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
+        .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
+            "Unable to read clustering plan for instant: " + instantTime));
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
-    // Mark instant as clustering inflight
-    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
-    table.getMetaClient().reloadActiveTimeline();
-
-    final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
-        ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
-            new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
-        .performClustering(clusteringPlan, schema, instantTime);
-    JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
-    JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
-    writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
-    writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
-    commitOnAutoCommit(writeMetadata);
-    if (!writeMetadata.getCommitMetadata().isPresent()) {
-      HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
-          extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
-      writeMetadata.setCommitMetadata(Option.of(commitMetadata));
-    }
-    return writeMetadata;

Review comment:
       extracted to `executeClustering()` in BaseCommitActionExecutor.java

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -18,111 +18,48 @@
 
 package org.apache.hudi.table.action.cluster;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
-import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
 
-import org.apache.avro.Schema;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private static final Logger LOG = LogManager.getLogger(SparkExecuteClusteringCommitActionExecutor.class);
   private final HoodieClusteringPlan clusteringPlan;
 
   public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
                                                     HoodieWriteConfig config, HoodieTable table,
                                                     String instantTime) {
     super(context, config, table, instantTime, WriteOperationType.CLUSTER);
-    this.clusteringPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
-      .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException("Unable to read clustering plan for instant: " + instantTime));
+    this.clusteringPlan = ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
+        .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
+            "Unable to read clustering plan for instant: " + instantTime));
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
-    // Mark instant as clustering inflight
-    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
-    table.getMetaClient().reloadActiveTimeline();
-
-    final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
-        ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
-            new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
-        .performClustering(clusteringPlan, schema, instantTime);
-    JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
-    JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
-    writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
-    writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
-    commitOnAutoCommit(writeMetadata);
-    if (!writeMetadata.getCommitMetadata().isPresent()) {
-      HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
-          extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
-      writeMetadata.setCommitMetadata(Option.of(commitMetadata));
-    }
-    return writeMetadata;
-  }
-
-  /**
-   * Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
-   * But we can extend this to add more validation. E.g. number of records read = number of records written etc.
-   * We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
-   */
-  private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
-    if (writeMetadata.getWriteStatuses().isEmpty()) {
-      throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime
-          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
-          + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
-          + " write statuses");
-    }
+    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = executeClustering(clusteringPlan);
+    JavaRDD<WriteStatus> transformedWriteStatuses = HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses());
+    return writeMetadata.clone(transformedWriteStatuses);
   }
 
   @Override
   protected String getCommitActionType() {
     return HoodieTimeline.REPLACE_COMMIT_ACTION;
   }
-
-  @Override
-  protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
-    Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream()
-        .map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
-    // for the below execution strategy, new file group id would be same as old file group id
-    if (SparkSingleFileSortExecutionStrategy.class.getName().equals(config.getClusteringExecutionStrategyClass())) {
-      return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
-          .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
-    }
-    return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
-        .filter(fg -> !newFilesWritten.contains(fg))
-        .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
-  }

Review comment:
       extracted to BaseCommitActionExecutor.java

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -18,111 +18,48 @@
 
 package org.apache.hudi.table.action.cluster;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
-import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
 
-import org.apache.avro.Schema;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private static final Logger LOG = LogManager.getLogger(SparkExecuteClusteringCommitActionExecutor.class);
   private final HoodieClusteringPlan clusteringPlan;
 
   public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
                                                     HoodieWriteConfig config, HoodieTable table,
                                                     String instantTime) {
     super(context, config, table, instantTime, WriteOperationType.CLUSTER);
-    this.clusteringPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
-      .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException("Unable to read clustering plan for instant: " + instantTime));
+    this.clusteringPlan = ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
+        .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
+            "Unable to read clustering plan for instant: " + instantTime));
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
-    // Mark instant as clustering inflight
-    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
-    table.getMetaClient().reloadActiveTimeline();
-
-    final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
-        ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
-            new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
-        .performClustering(clusteringPlan, schema, instantTime);
-    JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
-    JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
-    writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
-    writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
-    commitOnAutoCommit(writeMetadata);
-    if (!writeMetadata.getCommitMetadata().isPresent()) {
-      HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
-          extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
-      writeMetadata.setCommitMetadata(Option.of(commitMetadata));
-    }
-    return writeMetadata;
-  }
-
-  /**
-   * Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
-   * But we can extend this to add more validation. E.g. number of records read = number of records written etc.
-   * We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
-   */
-  private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
-    if (writeMetadata.getWriteStatuses().isEmpty()) {
-      throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime
-          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
-          + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
-          + " write statuses");
-    }

Review comment:
       extracted to BaseCommitActionExecutor.java




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org