You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/15 22:09:17 UTC

[GitHub] [hudi] suryaprasanna commented on a diff in pull request #5958: [HUDI-3900] [UBER] Support log compaction action for MOR tables

suryaprasanna commented on code in PR #5958:
URL: https://github.com/apache/hudi/pull/5958#discussion_r967711327


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -138,6 +144,20 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .sinceVersion("0.11.0")
       .withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
 
+  public static final ConfigProperty<String> PARTITIONS_FOR_COMPACTION = ConfigProperty
+      .key("hoodie.compaction.target.partitions")
+      .defaultValue("")
+      .withDocumentation("Used by org.apache.hudi.table.action.compact.strategy.SpecificPartitionCompactionStrategy "
+          + "to filter the required partitions to compact. This takes a string value with partitions separated by comma. "
+          + "Empty value implies no filtering so all the partitions are selected.");
+
+  public static final ConfigProperty<String> PARTITIONS_FOR_LOG_COMPACTION = ConfigProperty

Review Comment:
   Will remove this as well.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -266,6 +272,20 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .sinceVersion("0.11.0")
       .withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
 
+  public static final ConfigProperty<String> PARTITIONS_FOR_COMPACTION = ConfigProperty

Review Comment:
   Sure, will remove config to run log compaction on specified partitions.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -369,7 +370,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
   protected void rollbackFailedBootstrap() {
     LOG.info("Rolling back pending bootstrap if present");
     HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
-    HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
+    HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();

Review Comment:
   Discussed offline, parking the discussion on this for now.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java:
##########
@@ -36,7 +36,7 @@ public class HoodieCommandBlock extends HoodieLogBlock {
    * Hoodie command block type enum.
    */
   public enum HoodieCommandBlockTypeEnum {
-    ROLLBACK_PREVIOUS_BLOCK
+    ROLLBACK_BLOCK

Review Comment:
   Yes, this is backward compatible. We are storing the ordinal value of the enum in the disk, so we should be alright.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -369,7 +370,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
   protected void rollbackFailedBootstrap() {
     LOG.info("Rolling back pending bootstrap if present");
     HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
-    HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
+    HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();

Review Comment:
   As of now, we have standardized terminology "major" and "minor" compaction for all the Timeline APIs. If needed we can follow up.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java:
##########
@@ -118,6 +118,12 @@ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInst
         archivedMetaWrapper.setActionType(ActionType.compaction.name());
         break;
       }
+      case HoodieTimeline.LOG_COMPACTION_ACTION: {

Review Comment:
   Yes, archival test is added in this class TestHoodieClientOnMergeOnReadStorage.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -456,6 +491,23 @@ public List<WriteStatus> close() {
     }
   }
 
+  public void write(Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
+    Iterator<String> keyIterator = recordMap.keySet().stream().iterator();

Review Comment:
   Sure.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/SpecificPartitionCompactionStrategy.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.strategy;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class SpecificPartitionCompactionStrategy extends CompactionStrategy {

Review Comment:
   Removing these classes, as we thought it will be better to not restrict logcompaction to a partition.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.plan.generators;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.LogCompactionExecutionStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload, I, K, O> extends BaseHoodieCompactionPlanGenerator {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieLogCompactionPlanGenerator.class);
+
+  public HoodieLogCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  /**
+   * Generate a new log compaction plan for scheduling.
+   * @return Log Compaction Plan
+   * @throws java.io.IOException when encountering errors
+   */
+  @Override
+  public HoodieCompactionPlan generateCompactionPlan() {
+
+    // While scheduling log compaction (i.e. minor compaction) make sure only one log compaction is scheduled for a latest file Slice.
+    // Major compaction anyway will take care of creating a new base file, so if there is a pending compaction then log compaction
+    // need not be scheduled for previous file slice.
+    // Therefore, log compaction will only be scheduled for latest file slice or always for last file slice.
+    SyncableFileSystemView fileSystemView = (SyncableFileSystemView) this.hoodieTable.getSliceView();
+
+    // Accumulator to keep track of total log files for a table
+    HoodieAccumulator totalLogFiles = this.engineContext.newAccumulator();
+    // Accumulator to keep track of total log file slices for a table
+    HoodieAccumulator totalFileSlices = this.engineContext.newAccumulator();
+
+    HoodieTableMetaClient metaClient = this.hoodieTable.getMetaClient();
+
+    // Filter partition paths.
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(this.engineContext, writeConfig.getMetadataConfig(),
+        metaClient.getBasePath());
+
+    // Compaction Strategy should be SpecificPartitionCompactionStrategy to run a logcompaction on a specified partition.
+    partitionPaths = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitionPaths);
+
+    // Collect all pending compaction file groups
+    Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations()
+        .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
+        .collect(Collectors.toSet());
+
+    // Collect all pending log compaction file groups
+    fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
+        .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
+        .collect(Collectors.toSet()));
+
+    // Collect all pending clustering file groups
+    fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering()
+        .map(Pair::getLeft).collect(Collectors.toSet()));
+
+    String maxInstantTime = hoodieTable.getMetaClient()
+        .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+            HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
+        .filterCompletedInstants().lastInstant().get().getTimestamp();
+
+    // Here two different filters are applied before scheduling log compaction.
+    // 1. Exclude all the file groups which are either part of a pending compaction or clustering plans.
+    // 2. Check if FileSlices are meeting the criteria for LogCompaction.
+    List<HoodieCompactionOperation> operations = engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
+        .getLatestFileSlices(partitionPath)
+        .filter(fileSlice -> !fgIdsInPendingCompactionAndClustering.contains(fileSlice.getFileGroupId()))

Review Comment:
   Yes, only one of the table service can be executed on a file group.



##########
hudi-common/src/main/avro/HoodieCompactionOperation.avsc:
##########
@@ -84,6 +84,18 @@
        "name":"version",
        "type":["int", "null"],
        "default": 1
+    },
+    {
+       "name":"strategy",
+       "type":[
+         "null", "HoodieCompactionStrategy"
+       ],
+       "default": null
+    },
+    {
+       "name":"preserveHoodieMetadata",
+       "type":["boolean", "null"],
+       "default": false

Review Comment:
   I tried changing it, then I am getting run time errors. Looks like if we have non-null value as default then we may not use null as the first datatype.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -379,14 +435,29 @@ static HoodieInstant getIndexInflightInstant(final String timestamp) {
 
   /**
    * Returns the inflight instant corresponding to the instant being passed. Takes care of changes in action names
-   * between inflight and completed instants (compaction <=> commit).
+   * between inflight and completed instants (compaction <=> commit) and (logcompaction <==> deltacommit).
    * @param instant Hoodie Instant
-   * @param tableType Hoodie Table Type
+   * @param metaClient Hoodie metaClient to fetch tableType and fileSystem.
    * @return Inflight Hoodie Instant
    */
-  static HoodieInstant getInflightInstant(final HoodieInstant instant, final HoodieTableType tableType) {
-    if ((tableType == HoodieTableType.MERGE_ON_READ) && instant.getAction().equals(COMMIT_ACTION)) {
-      return new HoodieInstant(true, COMPACTION_ACTION, instant.getTimestamp());
+  static HoodieInstant getInflightInstant(final HoodieInstant instant, final HoodieTableMetaClient metaClient) {
+    if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+      if (instant.getAction().equals(COMMIT_ACTION)) {
+        return new HoodieInstant(true, COMPACTION_ACTION, instant.getTimestamp());
+      } else if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
+        // Deltacommit is used by both ingestion and logcompaction.
+        // So, distinguish both of them check for the inflight file being present.
+        FileSystem fs = metaClient.getFs();
+        String logCompactionRequestedFile = instant.getTimestamp() + "logcompaction.requested";
+        Path path = new Path(metaClient.getMetaPath(), logCompactionRequestedFile);
+        try {
+          if (fs.exists(path)) {

Review Comment:
   Sure, I am using following approach to find the requested instant.
   
   HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
           Option<HoodieInstant> logCompactionInstant = Option.fromJavaOptional(rawActiveTimeline.getInstants()
               .filter(hoodieInstant -> hoodieInstant.getTimestamp().equals(instant.getTimestamp())
                   && LOG_COMPACTION_ACTION.equals(hoodieInstant.getAction())).findFirst());



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1146,7 +1202,7 @@ public HoodieTimeline getTimeline() {
   @Override
   public void sync() {
     HoodieTimeline oldTimeline = getTimeline();
-    HoodieTimeline newTimeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
+    HoodieTimeline newTimeline = metaClient.reloadActiveTimeline().filterCompletedInstantsOrRewriteTimeline();

Review Comment:
   Incremental sync does not support clustering transitions. I wanted to add fix in another PR. So, I have included rewrites also to the timeline.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java:
##########
@@ -143,6 +148,11 @@ protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>> createFileId
     return fileIdToPendingCompaction;
   }
 
+  protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>> createFileIdToPendingLogCompactionMap(
+      Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fileIdToPendingLogCompaction) {
+    return fileIdToPendingLogCompaction;

Review Comment:
   Here we are just returning the fileIdToPendingLogCompaction value. But this method is overridden in SpillableMapBasedFileSystemView class.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java:
##########
@@ -147,6 +152,21 @@ public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEng
     return new SparkBootstrapDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
   }
 
+  @Override
+  public Option<HoodieCompactionPlan> scheduleLogCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
+    ScheduleCompactionActionExecutor scheduleLogCompactionExecutor = new ScheduleCompactionActionExecutor(

Review Comment:
   Discussed this offline, we can ignore this comment.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java:
##########
@@ -26,5 +26,8 @@ public enum CompactionTriggerStrategy {
     // trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied
     NUM_AND_TIME,
     // trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied
-    NUM_OR_TIME
+    NUM_OR_TIME,
+    // Always triggers. This is way to port the condition check from ScheduleCompactionActionExecutor
+    // towards the plan generators. Ideally done when there are complex condition checks.
+    ALWAYS_ALLOW

Review Comment:
   For Log compaction, I always want to allow from here, and check the conditions in the Planner class. Let me remove this and add an explicit check.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -540,6 +565,10 @@ public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
     rollbackInflightCompaction(inflightInstant, s -> Option.empty());
   }
 
+  public void rollbackInflightLogCompaction(HoodieInstant inflightInstant) {
+    rollbackInflightLogCompaction(inflightInstant, s -> Option.empty());

Review Comment:
   Yes, that is better approach.
   Logic to fetch pending rollback instants is part of BaseHoodieWriteClient, refactoring would more effort. So, I have created a follow-up ticket to address this.
   https://issues.apache.org/jira/browse/HUDI-4829



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -456,6 +491,23 @@ public List<WriteStatus> close() {
     }
   }
 
+  public void write(Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
+    Iterator<String> keyIterator = recordMap.keySet().stream().iterator();
+    try {
+      while (keyIterator.hasNext()) {
+        final String key = keyIterator.next();
+        HoodieRecord<T> record = (HoodieRecord<T>) recordMap.get(key);
+        init(record);
+        // For logCompaction operations all the records are read and written as a huge block.

Review Comment:
   I have also made a change to make sure only one delete block is created for an append operation, instead of delete blocks coming in between alternate data blocks.
   Ex:
   Previously HoodieAppendHandle will create blocks in this manner.
   Data1, Delete1, Data2, Delete2, Data3, Delete3
   After the change, the data will be written something like
   Data1, Data2, Data3, DeleteX(which includes changes from Delete1, Delete2, Delete3 blocks)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -163,6 +183,21 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + "record size estimate compute dynamically based on commit metadata. "
           + " This is critical in computing the insert parallelism and bin-packing inserts into small files.");
 
+  public static final ConfigProperty<Long> ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+      .key("hoodie.archive.merge.small.file.limit.bytes")

Review Comment:
   I think these got added while rebasing. Let me remove them.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -151,6 +173,19 @@ public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaC
     return migrator.upgradeToLatest(compactionPlan, compactionPlan.getVersion());
   }
 
+  /**
+   * This method will serve only log compaction instants,
+   * because we use same HoodieCompactionPlan for both the operations.
+   */
+  public static HoodieCompactionPlan getLogCompactionPlan(HoodieTableMetaClient metaClient,
+                                                       String logCompactionInstant) throws IOException {
+    HoodieInstant compactionRequestedInstant = HoodieTimeline.getLogCompactionRequestedInstant(logCompactionInstant);
+    CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);

Review Comment:
   Just to be safe, I added this. I can remove it if it is not required.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -379,11 +407,16 @@ public void doAppend() {
       flushToDiskIfRequired(record);
       writeToBuffer(record);
     }
-    appendDataAndDeleteBlocks(header);
+    appendDataAndDeleteBlocks(header, true);
     estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
   }
 
-  protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header) {
+  /**
+   * Appends data and delete blocks. When appendDeleteBlocks value is false, only data blocks are appended.
+   * This is done so that all the data blocks are created first and then a single delete block is added.
+   * Otherwise what can end up happening is creation of multiple small delete blocks get added after each data block.

Review Comment:
   Log compaction happens on consecutive blocks, here both data and delete blocks will be compacted.
   In the scenario you mentioned, what end up happening is HoodieMergedLogRecordScanner will create a List of HoodieRecords that already merges the updated records. So, by the time we reach HoodieAppendHandle we are dealing with only unique HoodieRecords. So, I think this case should be handled.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java:
##########
@@ -83,4 +83,16 @@ public void completeInflightCompaction(HoodieTable table, String compactionCommi
           "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + compactionCommitTime, e);
     }
   }
+
+  public void completeInflightLogCompaction(HoodieTable table, String logCompactionCommitTime, HoodieCommitMetadata commitMetadata) {

Review Comment:
   I tried doing that, then the logic needs to handled by more if conditions and Compaction type checks. So, I created individual methods to differentiate them. I feel this is more reader friendly. Let me know, what your thoughts are on this.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -456,6 +491,23 @@ public List<WriteStatus> close() {
     }
   }
 
+  public void write(Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
+    Iterator<String> keyIterator = recordMap.keySet().stream().iterator();
+    try {
+      while (keyIterator.hasNext()) {
+        final String key = keyIterator.next();
+        HoodieRecord<T> record = (HoodieRecord<T>) recordMap.get(key);
+        init(record);
+        // For logCompaction operations all the records are read and written as a huge block.

Review Comment:
   Yeah, basically we want to create a huge hfile blocks. 
   Let me use the previous logic itself then to support writing multiple blocks and we will use HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE to write large blocks. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java:
##########
@@ -161,6 +181,17 @@ public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
         shouldRollbackUsingMarkers).execute();
   }
 
+  @Override
+  public Iterator<List<WriteStatus>> handlePreppedInserts(String instantTime, String partitionPath, String fileId,

Review Comment:
   Yeah, makes sense renaming this to handleInsertsForLogCompaction method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org